You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2018/04/03 03:15:32 UTC
[kylin] 07/08: KYLIN-3315 allow each project to set its own source
This is an automated email from the ASF dual-hosted git repository.
liyang pushed a commit to branch sync
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 54bdae63cfca137c02cdf8749f93d152a9ac2010
Author: lidongsjtu <li...@apache.org>
AuthorDate: Tue Apr 3 11:11:36 2018 +0800
KYLIN-3315 allow each project to set its own source
---
.../test/java/org/apache/kylin/job/DeployUtil.java | 4 +-
.../java/org/apache/kylin/cube/CubeManager.java | 4 +-
.../org/apache/kylin/dict/lookup/SnapshotCLI.java | 4 +-
.../kylin/metadata/TableMetadataManager.java | 4 +-
.../kylin/metadata/model/ExternalFilterDesc.java | 21 ++-
.../apache/kylin/metadata/model/ISourceAware.java | 4 +
.../org/apache/kylin/metadata/model/TableDesc.java | 13 +-
.../kylin/metadata/project/ProjectInstance.java | 7 +-
.../main/java/org/apache/kylin/source/ISource.java | 8 +-
.../org/apache/kylin/source/SourceFactory.java | 62 ---------
.../org/apache/kylin/source/SourceManager.java | 154 +++++++++++++++++++++
.../java/org/apache/kylin/engine/mr/MRUtil.java | 10 +-
.../kylin/engine/mr/common/JobRelatedMetaUtil.java | 4 +-
.../kylin/provision/BuildCubeWithStream.java | 4 +-
.../org/apache/kylin/source/SourceManagerTest.java | 61 ++++++++
.../source/hive/ITHiveSourceTableLoaderTest.java | 4 +-
.../kylin/source/hive/ITSnapshotManagerTest.java | 4 +-
.../source/jdbc/ITJdbcSourceTableLoaderTest.java | 9 +-
.../kylin/source/jdbc/ITJdbcTableReaderTest.java | 5 +
.../kylin/query/schema/OLAPSchemaFactory.java | 25 ++--
.../org/apache/kylin/query/util/PushDownUtil.java | 3 +-
.../kylin/rest/controller/TableController.java | 8 +-
.../apache/kylin/rest/job/StorageCleanupJob.java | 4 +-
.../org/apache/kylin/rest/service/JobService.java | 4 +-
.../apache/kylin/rest/service/TableService.java | 55 ++++----
.../org/apache/kylin/source/hive/HiveSource.java | 12 +-
.../org/apache/kylin/source/jdbc/JdbcSource.java | 11 +-
.../org/apache/kylin/source/kafka/KafkaSource.java | 47 +++++--
.../tool/metrics/systemcube/KylinTableCreator.java | 15 +-
29 files changed, 401 insertions(+), 169 deletions(-)
diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
index a418dc9..524c2e4 100644
--- a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
+++ b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
@@ -47,7 +47,7 @@ import org.apache.kylin.metadata.model.DataModelManager;
import org.apache.kylin.metadata.model.TableRef;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.source.ISampleDataDeployer;
-import org.apache.kylin.source.SourceFactory;
+import org.apache.kylin.source.SourceManager;
import org.apache.kylin.source.datagen.ModelDataGenerator;
import org.apache.kylin.source.kafka.TimedJsonStreamParser;
import org.apache.maven.model.Model;
@@ -231,7 +231,7 @@ public class DeployUtil {
}
tempDir.deleteOnExit();
- ISampleDataDeployer sampleDataDeployer = SourceFactory.getSource(model.getRootFactTable().getTableDesc())
+ ISampleDataDeployer sampleDataDeployer = SourceManager.getSource(model.getRootFactTable().getTableDesc())
.getSampleDataDeployer();
// create hive tables
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index b8deadb..fc2ad3d 100755
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -69,7 +69,7 @@ import org.apache.kylin.metadata.realization.IRealizationProvider;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
import org.apache.kylin.metadata.realization.RealizationType;
import org.apache.kylin.source.IReadableTable;
-import org.apache.kylin.source.SourceFactory;
+import org.apache.kylin.source.SourceManager;
import org.apache.kylin.source.SourcePartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -1052,7 +1052,7 @@ public class CubeManager implements IRealizationProvider {
SnapshotManager snapshotMgr = getSnapshotManager();
TableDesc tableDesc = new TableDesc(metaMgr.getTableDesc(lookupTable, segCopy.getProject()));
- IReadableTable hiveTable = SourceFactory.createReadableTable(tableDesc);
+ IReadableTable hiveTable = SourceManager.createReadableTable(tableDesc);
SnapshotTable snapshot = snapshotMgr.buildSnapshot(hiveTable, tableDesc);
segCopy.putSnapshotResPath(lookupTable, snapshot.getResourcePath());
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotCLI.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotCLI.java
index 2093d23..f965d18 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotCLI.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotCLI.java
@@ -23,7 +23,7 @@ import java.io.IOException;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.metadata.TableMetadataManager;
import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.source.SourceFactory;
+import org.apache.kylin.source.SourceManager;
public class SnapshotCLI {
@@ -42,7 +42,7 @@ public class SnapshotCLI {
if (tableDesc == null)
throw new IllegalArgumentException("Not table found by " + table);
- SnapshotTable snapshot = snapshotMgr.rebuildSnapshot(SourceFactory.createReadableTable(tableDesc), tableDesc, overwriteUUID);
+ SnapshotTable snapshot = snapshotMgr.rebuildSnapshot(SourceManager.createReadableTable(tableDesc), tableDesc, overwriteUUID);
System.out.println("resource path updated: " + snapshot.getResourcePath());
}
}
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/TableMetadataManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/TableMetadataManager.java
index 42233b7..116e210 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/TableMetadataManager.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/TableMetadataManager.java
@@ -118,7 +118,7 @@ public class TableMetadataManager {
@Override
protected TableDesc initEntityAfterReload(TableDesc t, String resourceName) {
String prj = TableDesc.parseResourcePath(resourceName).getSecond();
- t.init(prj);
+ t.init(config, prj);
return t;
}
};
@@ -237,7 +237,7 @@ public class TableMetadataManager {
public void saveSourceTable(TableDesc srcTable, String prj) throws IOException {
try (AutoLock lock = srcTableMapLock.lockForWrite()) {
- srcTable.init(prj);
+ srcTable.init(config, prj);
srcTableCrud.save(srcTable);
}
}
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ExternalFilterDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ExternalFilterDesc.java
index 35018c7..7ef84aa 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ExternalFilterDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ExternalFilterDesc.java
@@ -18,6 +18,7 @@
package org.apache.kylin.metadata.model;
+import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.RootPersistentEntity;
import org.apache.kylin.metadata.filter.function.Functions;
@@ -62,7 +63,7 @@ public class ExternalFilterDesc extends RootPersistentEntity implements ISourceA
public String resourceName() {
return name;
}
-
+
public String getFilterResourceIdentifier() {
return filterResourceIdentifier;
}
@@ -94,7 +95,8 @@ public class ExternalFilterDesc extends RootPersistentEntity implements ISourceA
@Override
public String toString() {
- return "ExternalFilterDesc [ name=" + name + " filter table resource identifier " + this.filterResourceIdentifier + "]";
+ return "ExternalFilterDesc [ name=" + name + " filter table resource identifier "
+ + this.filterResourceIdentifier + "]";
}
/** create a mockup table for unit test */
@@ -104,11 +106,6 @@ public class ExternalFilterDesc extends RootPersistentEntity implements ISourceA
return mockup;
}
- @Override
- public int getSourceType() {
- return sourceType;
- }
-
public void setSourceType(int sourceType) {
this.sourceType = sourceType;
}
@@ -120,4 +117,14 @@ public class ExternalFilterDesc extends RootPersistentEntity implements ISourceA
public void setDescription(String description) {
this.description = description;
}
+
+ @Override
+ public int getSourceType() {
+ return sourceType;
+ }
+
+ @Override
+ public KylinConfig getConfig() {
+ return null;
+ }
}
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java
index 7ab1bca..eab3e2c 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java
@@ -18,6 +18,8 @@
package org.apache.kylin.metadata.model;
+import org.apache.kylin.common.KylinConfig;
+
public interface ISourceAware {
public static final int ID_HIVE = 0;
@@ -27,4 +29,6 @@ public interface ISourceAware {
public static final int ID_JDBC = 8;
int getSourceType();
+
+ KylinConfig getConfig();
}
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
index be278de..a9e9877 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
@@ -22,6 +22,7 @@ import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
+import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.RootPersistentEntity;
import org.apache.kylin.common.util.Pair;
@@ -98,6 +99,7 @@ public class TableDesc extends RootPersistentEntity implements ISourceAware {
private String dataGen;
private String project;
+ private KylinConfig config;
private DatabaseDesc database = new DatabaseDesc();
private String identity = null;
private boolean isBorrowedFromGlobal = false;
@@ -121,6 +123,7 @@ public class TableDesc extends RootPersistentEntity implements ISourceAware {
}
this.project = other.project;
+ this.config = other.config;
this.database.setName(other.getDatabase());
this.identity = other.identity;
}
@@ -287,9 +290,10 @@ public class TableDesc extends RootPersistentEntity implements ISourceAware {
return dataGen;
}
- public void init(String project) {
+ public void init(KylinConfig config, String project) {
this.project = project;
-
+ this.config = config;
+
if (name != null)
name = name.toUpperCase();
@@ -372,6 +376,11 @@ public class TableDesc extends RootPersistentEntity implements ISourceAware {
return sourceType;
}
+ @Override
+ public KylinConfig getConfig() {
+ return config;
+ }
+
public void setSourceType(int sourceType) {
this.sourceType = sourceType;
}
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
index 9b7aaf2..45622f3 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
@@ -31,6 +31,7 @@ import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.KylinConfigExt;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.RootPersistentEntity;
+import org.apache.kylin.metadata.model.ISourceAware;
import org.apache.kylin.metadata.realization.RealizationType;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
@@ -47,7 +48,7 @@ import com.google.common.collect.Lists;
*/
@SuppressWarnings("serial")
@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
-public class ProjectInstance extends RootPersistentEntity {
+public class ProjectInstance extends RootPersistentEntity implements ISourceAware {
public static final String DEFAULT_PROJECT_NAME = "default";
@@ -338,4 +339,8 @@ public class ProjectInstance extends RootPersistentEntity {
return "ProjectDesc [name=" + name + "]";
}
+ @Override
+ public int getSourceType() {
+ return getConfig().getDefaultSource();
+ }
}
diff --git a/core-metadata/src/main/java/org/apache/kylin/source/ISource.java b/core-metadata/src/main/java/org/apache/kylin/source/ISource.java
index 42548ae..2c5a922 100644
--- a/core-metadata/src/main/java/org/apache/kylin/source/ISource.java
+++ b/core-metadata/src/main/java/org/apache/kylin/source/ISource.java
@@ -18,13 +18,15 @@
package org.apache.kylin.source;
+import java.io.Closeable;
+
import org.apache.kylin.metadata.model.IBuildable;
import org.apache.kylin.metadata.model.TableDesc;
/**
* Represents a kind of source to Kylin, like Hive.
*/
-public interface ISource {
+public interface ISource extends Closeable {
/**
* Return an explorer to sync table metadata from the data source.
@@ -41,13 +43,13 @@ public interface ISource {
* Return a ReadableTable that can iterate through the rows of given table.
*/
IReadableTable createReadableTable(TableDesc tableDesc);
-
+
/**
* Give the source a chance to enrich a SourcePartition before build start.
* Particularly, Kafka source use this chance to define start/end offsets within each partition.
*/
SourcePartition enrichSourcePartitionBeforeBuild(IBuildable buildable, SourcePartition srcPartition);
-
+
/**
* Return an object that is responsible for deploying sample (CSV) data to the source database.
* For testing purpose.
diff --git a/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java b/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java
deleted file mode 100644
index 365b505..0000000
--- a/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.source;
-
-import java.util.List;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.ImplementationSwitch;
-import org.apache.kylin.metadata.model.ISourceAware;
-import org.apache.kylin.metadata.model.TableDesc;
-
-public class SourceFactory {
-
- // Use thread-local because KylinConfig can be thread-local and implementation might be different among multiple threads.
- private static ThreadLocal<ImplementationSwitch<ISource>> sources = new ThreadLocal<>();
-
- private static ISource getSource(int sourceType) {
- ImplementationSwitch<ISource> current = sources.get();
- if (current == null) {
- current = new ImplementationSwitch<>(KylinConfig.getInstanceFromEnv().getSourceEngines(), ISource.class);
- sources.set(current);
- }
- return current.get(sourceType);
- }
-
- public static ISource getDefaultSource() {
- return getSource(KylinConfig.getInstanceFromEnv().getDefaultSource());
- }
-
- public static ISource getSource(ISourceAware aware) {
- return getSource(aware.getSourceType());
- }
-
- public static IReadableTable createReadableTable(TableDesc table) {
- return getSource(table).createReadableTable(table);
- }
-
- public static <T> T createEngineAdapter(ISourceAware table, Class<T> engineInterface) {
- return getSource(table).adaptToBuildEngine(engineInterface);
- }
-
- public static List<String> getMRDependentResources(TableDesc table) {
- return getSource(table).getSourceMetadataExplorer().getRelatedKylinResources(table);
- }
-
-}
diff --git a/core-metadata/src/main/java/org/apache/kylin/source/SourceManager.java b/core-metadata/src/main/java/org/apache/kylin/source/SourceManager.java
new file mode 100644
index 0000000..62c4368
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/source/SourceManager.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.source;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.metadata.model.ISourceAware;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.metadata.project.ProjectManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+
+public class SourceManager {
+ private static final Logger logger = LoggerFactory.getLogger(SourceManager.class);
+
+ private final KylinConfig systemConfig;
+ private final Cache<String, ISource> sourceMap;
+
+ public static SourceManager getInstance(KylinConfig config) {
+ return config.getManager(SourceManager.class);
+ }
+
+ // called by reflection
+ static SourceManager newInstance(KylinConfig config) throws IOException {
+ return new SourceManager(config);
+ }
+
+ // ============================================
+
+ private SourceManager(KylinConfig config) {
+ this.systemConfig = config;
+ this.sourceMap = CacheBuilder.newBuilder().expireAfterWrite(1, TimeUnit.DAYS)
+ .removalListener(new RemovalListener<String, ISource>() {
+ @Override
+ public void onRemoval(RemovalNotification<String, ISource> entry) {
+ ISource s = entry.getValue();
+ if (s != null) {
+ try {
+ s.close();
+ } catch (Throwable e) {
+ logger.error("Failed to close ISource: {}", s.getClass().getName(), e);
+ }
+ }
+ }
+ }).build();
+ }
+
+ public ISource getCachedSource(ISourceAware aware) {
+ String key = createSourceCacheKey(aware);
+ ISource source = sourceMap.getIfPresent(key);
+ if (source != null)
+ return source;
+
+ synchronized (this) {
+ source = sourceMap.getIfPresent(key);
+ if (source != null)
+ return source;
+
+ source = createSource(aware);
+ sourceMap.put(key, source);
+ return source;
+ }
+ }
+
+ public ISource getProjectSource(String projectName) {
+ ProjectInstance projectInstance = ProjectManager.getInstance(systemConfig).getProject(projectName);
+ if (projectInstance != null)
+ return getCachedSource(projectInstance);
+ else
+ return getDefaultSource();
+ }
+
+ private String createSourceCacheKey(ISourceAware aware) {
+ StringBuilder builder = new StringBuilder();
+ builder.append(aware.getSourceType()).append('|');
+
+ KylinConfig config = aware.getConfig();
+ builder.append(config.getJdbcSourceConnectionUrl()).append('|');
+ builder.append(config.getJdbcSourceDriver()).append('|');
+ builder.append(config.getJdbcSourceUser()).append('|');
+ builder.append(config.getJdbcSourceFieldDelimiter()).append('|');
+ builder.append(config.getJdbcSourceDialect()).append('|');
+ return builder.toString(); // jdbc password not needed, because url+user should be identical.
+ }
+
+ private ISource createSource(ISourceAware aware) {
+ String sourceClazz = systemConfig.getSourceEngines().get(aware.getSourceType());
+ try {
+ return ClassUtil.forName(sourceClazz, ISource.class).getDeclaredConstructor(KylinConfig.class)
+ .newInstance(aware.getConfig());
+ } catch (Throwable e) {
+ logger.error("Failed to create source: SourceType={}", aware.getSourceType(), e);
+ return null;
+ }
+ }
+
+ // ==========================================================
+
+ public static ISource getSource(ISourceAware aware) {
+ return getInstance(aware.getConfig()).getCachedSource(aware);
+ }
+
+ public static ISource getDefaultSource() {
+ final KylinConfig sysConfig = KylinConfig.getInstanceFromEnv();
+ return getSource(new ISourceAware() {
+ @Override
+ public int getSourceType() {
+ return sysConfig.getDefaultSource();
+ }
+
+ @Override
+ public KylinConfig getConfig() {
+ return sysConfig;
+ }
+ });
+ }
+
+ public static IReadableTable createReadableTable(TableDesc table) {
+ return getSource(table).createReadableTable(table);
+ }
+
+ public static <T> T createEngineAdapter(ISourceAware table, Class<T> engineInterface) {
+ return getSource(table).adaptToBuildEngine(engineInterface);
+ }
+
+ public static List<String> getMRDependentResources(TableDesc table) {
+ return getSource(table).getSourceMetadataExplorer().getRelatedKylinResources(table);
+ }
+}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
index 124e5e7..3a9d0ed 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
@@ -33,23 +33,23 @@ import org.apache.kylin.engine.mr.IMROutput2.IMRBatchMergeOutputSide2;
import org.apache.kylin.metadata.TableMetadataManager;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.source.SourceFactory;
+import org.apache.kylin.source.SourceManager;
import org.apache.kylin.storage.StorageFactory;
public class MRUtil {
public static IMRBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg) {
IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(seg);
- return SourceFactory.createEngineAdapter(seg, IMRInput.class).getBatchCubingInputSide(flatDesc);
+ return SourceManager.createEngineAdapter(seg, IMRInput.class).getBatchCubingInputSide(flatDesc);
}
public static IMRTableInputFormat getTableInputFormat(String tableName, String prj) {
TableDesc t = getTableDesc(tableName, prj);
- return SourceFactory.createEngineAdapter(t, IMRInput.class).getTableInputFormat(t);
+ return SourceManager.createEngineAdapter(t, IMRInput.class).getTableInputFormat(t);
}
public static IMRTableInputFormat getTableInputFormat(TableDesc tableDesc) {
- return SourceFactory.createEngineAdapter(tableDesc, IMRInput.class).getTableInputFormat(tableDesc);
+ return SourceManager.createEngineAdapter(tableDesc, IMRInput.class).getTableInputFormat(tableDesc);
}
private static TableDesc getTableDesc(String tableName, String prj) {
@@ -73,7 +73,7 @@ public class MRUtil {
}
public static IMRInput.IMRBatchMergeInputSide getBatchMergeInputSide(CubeSegment seg) {
- return SourceFactory.createEngineAdapter(seg, IMRInput.class).getBatchMergeInputSide(seg);
+ return SourceManager.createEngineAdapter(seg, IMRInput.class).getBatchMergeInputSide(seg);
}
public static IMROutput2.IMRBatchOptimizeOutputSide2 getBatchOptimizeOutputSide2(CubeSegment seg) {
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobRelatedMetaUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobRelatedMetaUtil.java
index c34245b..2cd1841 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobRelatedMetaUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobRelatedMetaUtil.java
@@ -24,7 +24,7 @@ import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.model.TableRef;
-import org.apache.kylin.source.SourceFactory;
+import org.apache.kylin.source.SourceManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,7 +47,7 @@ public class JobRelatedMetaUtil {
for (TableRef tableRef : cube.getDescriptor().getModel().getAllTables()) {
TableDesc table = tableRef.getTableDesc();
dumpList.add(table.getResourcePath());
- dumpList.addAll(SourceFactory.getMRDependentResources(table));
+ dumpList.addAll(SourceManager.getMRDependentResources(table));
}
return dumpList;
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index 181e8b9..216ccc1 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -62,7 +62,7 @@ import org.apache.kylin.metadata.streaming.StreamingConfig;
import org.apache.kylin.metadata.streaming.StreamingManager;
import org.apache.kylin.rest.job.StorageCleanupJob;
import org.apache.kylin.source.ISource;
-import org.apache.kylin.source.SourceFactory;
+import org.apache.kylin.source.SourceManager;
import org.apache.kylin.source.SourcePartition;
import org.apache.kylin.source.kafka.KafkaConfigManager;
import org.apache.kylin.source.kafka.config.BrokerConfig;
@@ -282,7 +282,7 @@ public class BuildCubeWithStream {
protected ExecutableState buildSegment(String cubeName, long startOffset, long endOffset) throws Exception {
CubeInstance cubeInstance = cubeManager.getCube(cubeName);
- ISource source = SourceFactory.getSource(cubeInstance);
+ ISource source = SourceManager.getSource(cubeInstance);
SourcePartition partition = source.enrichSourcePartitionBeforeBuild(cubeInstance,
new SourcePartition(null, new SegmentRange(startOffset, endOffset), null, null));
CubeSegment segment = cubeManager.appendSegment(cubeManager.getCube(cubeName), partition);
diff --git a/kylin-it/src/test/java/org/apache/kylin/source/SourceManagerTest.java b/kylin-it/src/test/java/org/apache/kylin/source/SourceManagerTest.java
new file mode 100644
index 0000000..1d7440e
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/source/SourceManagerTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.metadata.model.ISourceAware;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class SourceManagerTest extends LocalFileMetadataTestCase {
+ @BeforeClass
+ public static void beforeClass() {
+ staticCreateTestMetadata();
+ }
+
+ @AfterClass
+ public static void afterClass() throws Exception {
+ cleanAfterClass();
+ }
+
+ @Test
+ public void testGetSource() {
+ final KylinConfig config = getTestConfig();
+ SourceManager sourceManager = SourceManager.getInstance(config);
+ ISource source = sourceManager.getCachedSource(new ISourceAware() {
+ @Override
+ public int getSourceType() {
+ return config.getDefaultSource();
+ }
+
+ @Override
+ public KylinConfig getConfig() {
+ return config;
+ }
+ });
+
+ Assert.assertEquals(config.getSourceEngines().get(config.getDefaultSource()), source.getClass().getName());
+ Assert.assertEquals(source, SourceManager.getDefaultSource());
+ Assert.assertEquals(source, SourceManager.getInstance(getTestConfig()).getProjectSource(null));
+ }
+
+}
diff --git a/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java b/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java
index 8e57bed..a5aea1b 100644
--- a/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java
@@ -26,7 +26,7 @@ import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.model.TableExtDesc;
import org.apache.kylin.source.ISource;
import org.apache.kylin.source.ISourceMetadataExplorer;
-import org.apache.kylin.source.SourceFactory;
+import org.apache.kylin.source.SourceManager;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -45,7 +45,7 @@ public class ITHiveSourceTableLoaderTest extends HBaseMetadataTestCase {
@Test
public void test() throws Exception {
- ISource source = SourceFactory.getDefaultSource();
+ ISource source = SourceManager.getDefaultSource();
ISourceMetadataExplorer explr = source.getSourceMetadataExplorer();
Pair<TableDesc, TableExtDesc> pair;
diff --git a/kylin-it/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java b/kylin-it/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java
index 384aa95..031da29 100644
--- a/kylin-it/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java
@@ -28,7 +28,7 @@ import org.apache.kylin.metadata.TableMetadataManager;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.source.IReadableTable;
import org.apache.kylin.source.IReadableTable.TableReader;
-import org.apache.kylin.source.SourceFactory;
+import org.apache.kylin.source.SourceManager;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -56,7 +56,7 @@ public class ITSnapshotManagerTest extends HBaseMetadataTestCase {
public void basicTest() throws Exception {
String tableName = "EDW.TEST_SITES";
TableDesc tableDesc = TableMetadataManager.getInstance(getTestConfig()).getTableDesc(tableName, "default");
- IReadableTable hiveTable = SourceFactory.createReadableTable(tableDesc);
+ IReadableTable hiveTable = SourceManager.createReadableTable(tableDesc);
String snapshotPath = snapshotMgr.buildSnapshot(hiveTable, tableDesc).getResourcePath();
snapshotMgr.wipeoutCache();
diff --git a/kylin-it/src/test/java/org/apache/kylin/source/jdbc/ITJdbcSourceTableLoaderTest.java b/kylin-it/src/test/java/org/apache/kylin/source/jdbc/ITJdbcSourceTableLoaderTest.java
index 3869cb6..557e2e7 100644
--- a/kylin-it/src/test/java/org/apache/kylin/source/jdbc/ITJdbcSourceTableLoaderTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/source/jdbc/ITJdbcSourceTableLoaderTest.java
@@ -35,7 +35,7 @@ import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.kylin.query.H2Database;
import org.apache.kylin.source.ISource;
import org.apache.kylin.source.ISourceMetadataExplorer;
-import org.apache.kylin.source.SourceFactory;
+import org.apache.kylin.source.SourceManager;
import org.apache.kylin.source.datagen.ModelDataGenerator;
import org.junit.After;
import org.junit.Before;
@@ -94,7 +94,7 @@ public class ITJdbcSourceTableLoaderTest extends LocalFileMetadataTestCase imple
@Test
public void test() throws Exception {
- ISource source = SourceFactory.getSource(new ITJdbcSourceTableLoaderTest());
+ ISource source = SourceManager.getSource(new ITJdbcSourceTableLoaderTest());
ISourceMetadataExplorer explr = source.getSourceMetadataExplorer();
Pair<TableDesc, TableExtDesc> pair;
@@ -111,4 +111,9 @@ public class ITJdbcSourceTableLoaderTest extends LocalFileMetadataTestCase imple
return ISourceAware.ID_JDBC;
}
+ @Override
+ public KylinConfig getConfig() {
+ return config;
+ }
+
}
diff --git a/kylin-it/src/test/java/org/apache/kylin/source/jdbc/ITJdbcTableReaderTest.java b/kylin-it/src/test/java/org/apache/kylin/source/jdbc/ITJdbcTableReaderTest.java
index 4a5bfe4..4441178 100644
--- a/kylin-it/src/test/java/org/apache/kylin/source/jdbc/ITJdbcTableReaderTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/source/jdbc/ITJdbcTableReaderTest.java
@@ -106,4 +106,9 @@ public class ITJdbcTableReaderTest extends LocalFileMetadataTestCase implements
return ISourceAware.ID_JDBC;
}
+ @Override
+ public KylinConfig getConfig() {
+ return getTestConfig();
+ }
+
}
diff --git a/query/src/main/java/org/apache/kylin/query/schema/OLAPSchemaFactory.java b/query/src/main/java/org/apache/kylin/query/schema/OLAPSchemaFactory.java
index 25baf55..a1935fe 100644
--- a/query/src/main/java/org/apache/kylin/query/schema/OLAPSchemaFactory.java
+++ b/query/src/main/java/org/apache/kylin/query/schema/OLAPSchemaFactory.java
@@ -49,19 +49,22 @@ public class OLAPSchemaFactory implements SchemaFactory {
@Override
public Schema create(SchemaPlus parentSchema, String schemaName, Map<String, Object> operand) {
String project = (String) operand.get(SCHEMA_PROJECT);
- Schema newSchema = new OLAPSchema(project, schemaName, exposeMore());
+ Schema newSchema = new OLAPSchema(project, schemaName, exposeMore(project));
return newSchema;
}
private static Map<String, File> cachedJsons = Maps.newConcurrentMap();
- public static boolean exposeMore() {
- return KylinConfig.getInstanceFromEnv().isPushDownEnabled();
+ public static boolean exposeMore(String project) {
+ return ProjectManager.getInstance(KylinConfig.getInstanceFromEnv()).getProject(project).getConfig()
+ .isPushDownEnabled();
}
public static File createTempOLAPJson(String project, KylinConfig config) {
- Collection<TableDesc> tables = ProjectManager.getInstance(config).listExposedTables(project, exposeMore());
+ ProjectManager projectManager = ProjectManager.getInstance(config);
+ KylinConfig projConfig = projectManager.getProject(project).getConfig();
+ Collection<TableDesc> tables = projectManager.listExposedTables(project, exposeMore(project));
// "database" in TableDesc correspond to our schema
// the logic to decide which schema to be "default" in calcite:
@@ -92,17 +95,16 @@ public class OLAPSchemaFactory implements SchemaFactory {
int counter = 0;
-
-
+ String schemaFactory = projConfig.getSchemaFactory();
for (String schemaName : schemaCounts.keySet()) {
out.append(" {\n");
out.append(" \"type\": \"custom\",\n");
out.append(" \"name\": \"" + schemaName + "\",\n");
- out.append(" \"factory\": \"" + KylinConfig.getInstanceFromEnv().getSchemaFactory()+ "\",\n");
+ out.append(" \"factory\": \"" + schemaFactory + "\",\n");
out.append(" \"operand\": {\n");
out.append(" \"" + SCHEMA_PROJECT + "\": \"" + project + "\"\n");
out.append(" },\n");
- createOLAPSchemaFunctions(out);
+ createOLAPSchemaFunctions(projConfig.getUDFs(), out);
out.append(" }\n");
if (++counter != schemaCounts.size()) {
@@ -132,9 +134,12 @@ public class OLAPSchemaFactory implements SchemaFactory {
}
}
- private static void createOLAPSchemaFunctions(StringBuilder out) throws IOException {
+ private static void createOLAPSchemaFunctions(Map<String, String> definedUdfs, StringBuilder out)
+ throws IOException {
Map<String, String> udfs = Maps.newHashMap();
- udfs.putAll(KylinConfig.getInstanceFromEnv().getUDFs());
+ if (definedUdfs != null)
+ udfs.putAll(definedUdfs);
+
for (Entry<String, Class<?>> entry : MeasureTypeFactory.getUDAFs().entrySet()) {
udfs.put(entry.getKey(), entry.getValue().getName());
}
diff --git a/query/src/main/java/org/apache/kylin/query/util/PushDownUtil.java b/query/src/main/java/org/apache/kylin/query/util/PushDownUtil.java
index cfe16c0..7c88141 100644
--- a/query/src/main/java/org/apache/kylin/query/util/PushDownUtil.java
+++ b/query/src/main/java/org/apache/kylin/query/util/PushDownUtil.java
@@ -47,6 +47,7 @@ import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.metadata.model.tool.CalciteParser;
+import org.apache.kylin.metadata.project.ProjectManager;
import org.apache.kylin.metadata.querymeta.SelectedColumnMeta;
import org.apache.kylin.metadata.realization.NoRealizationFoundException;
import org.apache.kylin.metadata.realization.RoutingIndicatorException;
@@ -74,7 +75,7 @@ public class PushDownUtil {
private static Pair<List<List<String>>, List<SelectedColumnMeta>> tryPushDownQuery(String project, String sql,
String defaultSchema, SQLException sqlException, boolean isSelect, boolean isPrepare) throws Exception {
- KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ KylinConfig kylinConfig = ProjectManager.getInstance(KylinConfig.getInstanceFromEnv()).getProject(project).getConfig();
if (!kylinConfig.isPushDownEnabled())
return null;
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
index 35849f0..7ada8cc 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
@@ -182,9 +182,9 @@ public class TableController extends BasicController {
*/
@RequestMapping(value = "/hive", method = { RequestMethod.GET }, produces = { "application/json" })
@ResponseBody
- private List<String> showHiveDatabases() throws IOException {
+ private List<String> showHiveDatabases(@RequestParam(value = "project", required = false) String project) throws IOException {
try {
- return tableService.getHiveDbNames();
+ return tableService.getSourceDbNames(project);
} catch (Throwable e) {
logger.error(e.getLocalizedMessage(), e);
throw new InternalErrorException(e.getLocalizedMessage());
@@ -199,9 +199,9 @@ public class TableController extends BasicController {
*/
@RequestMapping(value = "/hive/{database}", method = { RequestMethod.GET }, produces = { "application/json" })
@ResponseBody
- private List<String> showHiveTables(@PathVariable String database) throws IOException {
+ private List<String> showHiveTables(@PathVariable String database, @RequestParam(value = "project", required = false) String project) throws IOException {
try {
- return tableService.getHiveTableNames(database);
+ return tableService.getSourceTableNames(project, database);
} catch (Throwable e) {
logger.error(e.getLocalizedMessage(), e);
throw new InternalErrorException(e.getLocalizedMessage());
diff --git a/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java b/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java
index 59bd21f..114050c 100755
--- a/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java
@@ -52,7 +52,7 @@ import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.metadata.MetadataConstants;
import org.apache.kylin.source.ISourceMetadataExplorer;
-import org.apache.kylin.source.SourceFactory;
+import org.apache.kylin.source.SourceManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -133,7 +133,7 @@ public class StorageCleanupJob extends AbstractApplication {
}
protected List<String> getHiveTables() throws Exception {
- ISourceMetadataExplorer explr = SourceFactory.getDefaultSource().getSourceMetadataExplorer();
+ ISourceMetadataExplorer explr = SourceManager.getDefaultSource().getSourceMetadataExplorer();
return explr.listTables(KylinConfig.getInstanceFromEnv().getHiveDatabaseForIntermediateTable());
}
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index 6afc568..4317ed5 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -61,7 +61,7 @@ import org.apache.kylin.rest.msg.Message;
import org.apache.kylin.rest.msg.MsgPicker;
import org.apache.kylin.rest.util.AclEvaluate;
import org.apache.kylin.source.ISource;
-import org.apache.kylin.source.SourceFactory;
+import org.apache.kylin.source.SourceManager;
import org.apache.kylin.source.SourcePartition;
import org.apache.kylin.storage.hbase.util.ZookeeperJobLock;
import org.slf4j.Logger;
@@ -230,7 +230,7 @@ public class JobService extends BasicService implements InitializingBean {
CubeSegment newSeg = null;
try {
if (buildType == CubeBuildTypeEnum.BUILD) {
- ISource source = SourceFactory.getSource(cube);
+ ISource source = SourceManager.getSource(cube);
SourcePartition src = new SourcePartition(tsRange, segRange, sourcePartitionOffsetStart,
sourcePartitionOffsetEnd);
src = source.enrichSourcePartitionBeforeBuild(cube, src);
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
index d737a6a..ace1686 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
@@ -29,8 +29,6 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
-import javax.annotation.Nullable;
-
import org.apache.commons.lang.StringUtils;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.Pair;
@@ -40,11 +38,11 @@ import org.apache.kylin.engine.mr.common.MapReduceExecutable;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.metadata.MetadataConstants;
import org.apache.kylin.metadata.TableMetadataManager;
import org.apache.kylin.metadata.model.ColumnDesc;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.model.TableExtDesc;
+import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.kylin.metadata.streaming.StreamingConfig;
import org.apache.kylin.rest.exception.BadRequestException;
import org.apache.kylin.rest.msg.Message;
@@ -52,7 +50,7 @@ import org.apache.kylin.rest.msg.MsgPicker;
import org.apache.kylin.rest.response.TableDescResponse;
import org.apache.kylin.rest.util.AclEvaluate;
import org.apache.kylin.source.ISourceMetadataExplorer;
-import org.apache.kylin.source.SourceFactory;
+import org.apache.kylin.source.SourceManager;
import org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityJob;
import org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityUpdateJob;
import org.apache.kylin.source.kafka.config.KafkaConfig;
@@ -64,8 +62,6 @@ import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.stereotype.Component;
import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterables;
import com.google.common.collect.LinkedHashMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.SetMultimap;
@@ -136,7 +132,7 @@ public class TableService extends BasicService {
for (Pair<TableDesc, TableExtDesc> pair : allMeta) {
TableDesc tableDesc = pair.getFirst();
TableExtDesc extDesc = pair.getSecond();
-
+
TableDesc origTable = metaMgr.getTableDesc(tableDesc.getIdentity(), project);
if (origTable == null || origTable.getProject() == null) {
tableDesc.setUuid(UUID.randomUUID().toString());
@@ -157,7 +153,7 @@ public class TableService extends BasicService {
}
extDesc.init(project);
metaMgr.saveTableExt(extDesc, project);
-
+
saved.add(tableDesc.getIdentity());
}
@@ -176,14 +172,15 @@ public class TableService extends BasicService {
// load all tables first
List<Pair<TableDesc, TableExtDesc>> allMeta = Lists.newArrayList();
- ISourceMetadataExplorer explr = SourceFactory.getDefaultSource().getSourceMetadataExplorer();
+ ProjectInstance projectInstance = getProjectManager().getProject(project);
+ ISourceMetadataExplorer explr = SourceManager.getSource(projectInstance).getSourceMetadataExplorer();
for (Map.Entry<String, String> entry : db2tables.entries()) {
Pair<TableDesc, TableExtDesc> pair = explr.loadTableMetadata(entry.getKey(), entry.getValue(), project);
TableDesc tableDesc = pair.getFirst();
Preconditions.checkState(tableDesc.getDatabase().equals(entry.getKey().toUpperCase()));
Preconditions.checkState(tableDesc.getName().equals(entry.getValue().toUpperCase()));
- Preconditions.checkState(tableDesc.getIdentity().equals(entry.getKey().toUpperCase() + "." + entry
- .getValue().toUpperCase()));
+ Preconditions.checkState(tableDesc.getIdentity()
+ .equals(entry.getKey().toUpperCase() + "." + entry.getValue().toUpperCase()));
TableExtDesc extDesc = pair.getSecond();
Preconditions.checkState(tableDesc.getIdentity().equals(extDesc.getIdentity()));
allMeta.add(pair);
@@ -191,7 +188,8 @@ public class TableService extends BasicService {
return allMeta;
}
- public Map<String, String[]> loadHiveTables(String[] tableNames, String project, boolean isNeedProfile) throws Exception {
+ public Map<String, String[]> loadHiveTables(String[] tableNames, String project, boolean isNeedProfile)
+ throws Exception {
aclEvaluate.checkProjectAdminPermission(project);
String submitter = SecurityContextHolder.getContext().getAuthentication().getName();
Map<String, String[]> result = new HashMap<String, String[]>();
@@ -258,13 +256,14 @@ public class TableService extends BasicService {
tableName = normalizeHiveTableName(tableName);
TableDesc desc = getTableManager().getTableDesc(tableName, project);
-
+
// unload of legacy global table is not supported for now
if (desc == null || desc.getProject() == null) {
- logger.warn("Unload Table {} in Project {} failed, could not find TableDesc or related Project", tableName, project);
+ logger.warn("Unload Table {} in Project {} failed, could not find TableDesc or related Project", tableName,
+ project);
return false;
}
-
+
tableType = desc.getSourceType();
if (!modelService.isTableInModel(desc, project)) {
@@ -274,7 +273,7 @@ public class TableService extends BasicService {
List<String> models = modelService.getModelsUsingTable(desc, project);
throw new BadRequestException(String.format(msg.getTABLE_IN_USE_BY_MODEL(), models));
}
-
+
// it is a project local table, ready to remove since no model is using it within the project
TableMetadataManager metaMgr = getTableManager();
metaMgr.removeTableExt(tableName, project);
@@ -313,30 +312,27 @@ public class TableService extends BasicService {
/**
*
+ * @param project
* @return
* @throws Exception
*/
- public List<String> getHiveDbNames() throws Exception {
- ISourceMetadataExplorer explr = SourceFactory.getDefaultSource().getSourceMetadataExplorer();
+ public List<String> getSourceDbNames(String project) throws Exception {
+ ISourceMetadataExplorer explr = SourceManager.getInstance(getConfig()).getProjectSource(project)
+ .getSourceMetadataExplorer();
return explr.listDatabases();
}
/**
*
+ * @param project
* @param database
* @return
* @throws Exception
*/
- public List<String> getHiveTableNames(String database) throws Exception {
- ISourceMetadataExplorer explr = SourceFactory.getDefaultSource().getSourceMetadataExplorer();
- List<String> hiveTableNames = explr.listTables(database);
- Iterable<String> kylinApplicationTableNames = Iterables.filter(hiveTableNames, new Predicate<String>() {
- @Override
- public boolean apply(@Nullable String input) {
- return input != null && !input.startsWith(MetadataConstants.KYLIN_INTERMEDIATE_PREFIX);
- }
- });
- return Lists.newArrayList(kylinApplicationTableNames);
+ public List<String> getSourceTableNames(String project, String database) throws Exception {
+ ISourceMetadataExplorer explr = SourceManager.getInstance(getConfig()).getProjectSource(project)
+ .getSourceMetadataExplorer();
+ return explr.listTables(database);
}
private TableDescResponse cloneTableDesc(TableDesc table, String prj) {
@@ -355,7 +351,8 @@ public class TableService extends BasicService {
if (cards.length > i) {
cardinality.put(columnDesc.getName(), Long.parseLong(cards[i]));
} else {
- logger.error("The result cardinality is not identical with hive table metadata, cardinality : " + scard + " column array length: " + cdescs.length);
+ logger.error("The result cardinality is not identical with hive table metadata, cardinality : "
+ + scard + " column array length: " + cdescs.length);
break;
}
}
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java
index 129098c..58bd2c3 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java
@@ -18,6 +18,8 @@
package org.apache.kylin.source.hive;
+import java.io.IOException;
+
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.engine.mr.IMRInput;
import org.apache.kylin.metadata.model.IBuildable;
@@ -28,8 +30,10 @@ import org.apache.kylin.source.ISource;
import org.apache.kylin.source.ISourceMetadataExplorer;
import org.apache.kylin.source.SourcePartition;
-//used by reflection
public class HiveSource implements ISource {
+ //used by reflection
+ public HiveSource(KylinConfig config) {
+ }
@Override
public ISourceMetadataExplorer getSourceMetadataExplorer() {
@@ -53,7 +57,7 @@ public class HiveSource implements ISource {
if (tableDesc.isView()) {
KylinConfig config = KylinConfig.getInstanceFromEnv();
String tableName = tableDesc.getMaterializedName();
-
+
tableDesc = new TableDesc();
tableDesc.setDatabase(config.getHiveDatabaseForIntermediateTable());
tableDesc.setName(tableName);
@@ -75,4 +79,8 @@ public class HiveSource implements ISource {
return new HiveMetadataExplorer();
}
+ @Override
+ public void close() throws IOException {
+ // not needed
+ }
}
diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java
index 5e06f90..ae3bbc5 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java
@@ -18,6 +18,9 @@
package org.apache.kylin.source.jdbc;
+import java.io.IOException;
+
+import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.engine.mr.IMRInput;
import org.apache.kylin.metadata.model.IBuildable;
import org.apache.kylin.metadata.model.TableDesc;
@@ -27,8 +30,10 @@ import org.apache.kylin.source.ISource;
import org.apache.kylin.source.ISourceMetadataExplorer;
import org.apache.kylin.source.SourcePartition;
-//used by reflection
public class JdbcSource implements ISource {
+ //used by reflection
+ public JdbcSource(KylinConfig config) {
+ }
@Override
public ISourceMetadataExplorer getSourceMetadataExplorer() {
@@ -62,4 +67,8 @@ public class JdbcSource implements ISource {
return new JdbcExplorer();
}
+ @Override
+ public void close() throws IOException {
+ // not needed
+ }
}
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
index 1142243..0ab83c6 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
@@ -18,6 +18,7 @@
package org.apache.kylin.source.kafka;
+import java.io.IOException;
import java.util.List;
import java.util.Map;
@@ -46,11 +47,14 @@ import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
-//used by reflection
public class KafkaSource implements ISource {
private static final Logger logger = LoggerFactory.getLogger(KafkaSource.class);
+ //used by reflection
+ public KafkaSource(KylinConfig config) {
+ }
+
@SuppressWarnings("unchecked")
@Override
public <I> I adaptToBuildEngine(Class<I> engineInterface) {
@@ -75,20 +79,25 @@ public class KafkaSource implements ISource {
if (range == null || range.start.v.equals(0L)) {
final CubeSegment last = cube.getLastSegment();
if (last != null) {
- logger.debug("Last segment exists, continue from last segment " + last.getName() + "'s end position: " + last.getSourcePartitionOffsetEnd());
+ logger.debug("Last segment exists, continue from last segment " + last.getName() + "'s end position: "
+ + last.getSourcePartitionOffsetEnd());
// from last seg's end position
result.setSourcePartitionOffsetStart(last.getSourcePartitionOffsetEnd());
- } else if (cube.getDescriptor().getPartitionOffsetStart() != null && cube.getDescriptor().getPartitionOffsetStart().size() > 0) {
- logger.debug("Last segment doesn't exist, use the start offset that be initiated previously: " + cube.getDescriptor().getPartitionOffsetStart());
+ } else if (cube.getDescriptor().getPartitionOffsetStart() != null
+ && cube.getDescriptor().getPartitionOffsetStart().size() > 0) {
+ logger.debug("Last segment doesn't exist, use the start offset that be initiated previously: "
+ + cube.getDescriptor().getPartitionOffsetStart());
result.setSourcePartitionOffsetStart(cube.getDescriptor().getPartitionOffsetStart());
} else {
// from the topic's earliest offset;
- logger.debug("Last segment doesn't exist, and didn't initiate the start offset, will seek from topic's earliest offset.");
+ logger.debug(
+ "Last segment doesn't exist, and didn't initiate the start offset, will seek from topic's earliest offset.");
result.setSourcePartitionOffsetStart(KafkaClient.getEarliestOffsets(cube));
}
}
- final KafkaConfig kafkaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(cube.getRootFactTable());
+ final KafkaConfig kafkaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv())
+ .getKafkaConfig(cube.getRootFactTable());
final String brokers = KafkaClient.getKafkaBrokers(kafkaConfig);
final String topic = kafkaConfig.getTopic();
try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cube.getName(), null)) {
@@ -111,7 +120,9 @@ public class KafkaSource implements ISource {
for (Integer partitionId : latestOffsets.keySet()) {
if (result.getSourcePartitionOffsetStart().containsKey(partitionId)) {
if (result.getSourcePartitionOffsetStart().get(partitionId) > latestOffsets.get(partitionId)) {
- throw new IllegalArgumentException("Partition " + partitionId + " end offset (" + latestOffsets.get(partitionId) + ") is smaller than start offset ( " + result.getSourcePartitionOffsetStart().get(partitionId) + ")");
+ throw new IllegalArgumentException("Partition " + partitionId + " end offset ("
+ + latestOffsets.get(partitionId) + ") is smaller than start offset ( "
+ + result.getSourcePartitionOffsetStart().get(partitionId) + ")");
}
} else {
throw new IllegalStateException("New partition added in between, retry.");
@@ -129,7 +140,8 @@ public class KafkaSource implements ISource {
}
if (totalStartOffset > totalEndOffset) {
- throw new IllegalArgumentException("Illegal offset: start: " + totalStartOffset + ", end: " + totalEndOffset);
+ throw new IllegalArgumentException(
+ "Illegal offset: start: " + totalStartOffset + ", end: " + totalEndOffset);
}
if (totalStartOffset == totalEndOffset) {
@@ -155,7 +167,8 @@ public class KafkaSource implements ISource {
if (startOffset > 0) {
if (sourcePartitionOffsetStart == null || sourcePartitionOffsetStart.size() == 0) {
- throw new IllegalArgumentException("When 'startOffset' is > 0, need provide each partition's start offset");
+ throw new IllegalArgumentException(
+ "When 'startOffset' is > 0, need provide each partition's start offset");
}
long totalOffset = 0;
@@ -164,13 +177,15 @@ public class KafkaSource implements ISource {
}
if (totalOffset != startOffset) {
- throw new IllegalArgumentException("Invalid 'sourcePartitionOffsetStart', doesn't match with 'startOffset'");
+ throw new IllegalArgumentException(
+ "Invalid 'sourcePartitionOffsetStart', doesn't match with 'startOffset'");
}
}
if (endOffset > 0 && endOffset != Long.MAX_VALUE) {
if (sourcePartitionOffsetEnd == null || sourcePartitionOffsetEnd.size() == 0) {
- throw new IllegalArgumentException("When 'endOffset' is not Long.MAX_VALUE, need provide each partition's start offset");
+ throw new IllegalArgumentException(
+ "When 'endOffset' is not Long.MAX_VALUE, need provide each partition's start offset");
}
long totalOffset = 0;
@@ -179,7 +194,8 @@ public class KafkaSource implements ISource {
}
if (totalOffset != endOffset) {
- throw new IllegalArgumentException("Invalid 'sourcePartitionOffsetEnd', doesn't match with 'endOffset'");
+ throw new IllegalArgumentException(
+ "Invalid 'sourcePartitionOffsetEnd', doesn't match with 'endOffset'");
}
}
}
@@ -199,7 +215,8 @@ public class KafkaSource implements ISource {
}
@Override
- public Pair<TableDesc, TableExtDesc> loadTableMetadata(String database, String table, String prj) throws Exception {
+ public Pair<TableDesc, TableExtDesc> loadTableMetadata(String database, String table, String prj)
+ throws Exception {
throw new UnsupportedOperationException();
}
@@ -223,4 +240,8 @@ public class KafkaSource implements ISource {
throw new UnsupportedOperationException();
}
+ @Override
+ public void close() throws IOException {
+ // not needed
+ }
}
diff --git a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/KylinTableCreator.java b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/KylinTableCreator.java
index 8aac466..a2a0616 100644
--- a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/KylinTableCreator.java
+++ b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/KylinTableCreator.java
@@ -54,38 +54,39 @@ public class KylinTableCreator {
List<Pair<String, String>> columns = Lists.newLinkedList();
columns.addAll(HiveTableCreator.getHiveColumnsForMetricsQuery());
columns.addAll(HiveTableCreator.getPartitionKVsForHiveTable());
- return generateKylinTable(sinkTool, kylinConfig.getKylinMetricsSubjectQuery(), columns);
+ return generateKylinTable(kylinConfig, sinkTool, kylinConfig.getKylinMetricsSubjectQuery(), columns);
}
public static TableDesc generateKylinTableForMetricsQueryCube(KylinConfig kylinConfig, SinkTool sinkTool) {
List<Pair<String, String>> columns = Lists.newLinkedList();
columns.addAll(HiveTableCreator.getHiveColumnsForMetricsQueryCube());
columns.addAll(HiveTableCreator.getPartitionKVsForHiveTable());
- return generateKylinTable(sinkTool, kylinConfig.getKylinMetricsSubjectQueryCube(), columns);
+ return generateKylinTable(kylinConfig, sinkTool, kylinConfig.getKylinMetricsSubjectQueryCube(), columns);
}
public static TableDesc generateKylinTableForMetricsQueryRPC(KylinConfig kylinConfig, SinkTool sinkTool) {
List<Pair<String, String>> columns = Lists.newLinkedList();
columns.addAll(HiveTableCreator.getHiveColumnsForMetricsQueryRPC());
columns.addAll(HiveTableCreator.getPartitionKVsForHiveTable());
- return generateKylinTable(sinkTool, kylinConfig.getKylinMetricsSubjectQueryRpcCall(), columns);
+ return generateKylinTable(kylinConfig, sinkTool, kylinConfig.getKylinMetricsSubjectQueryRpcCall(), columns);
}
public static TableDesc generateKylinTableForMetricsJob(KylinConfig kylinConfig, SinkTool sinkTool) {
List<Pair<String, String>> columns = Lists.newLinkedList();
columns.addAll(HiveTableCreator.getHiveColumnsForMetricsJob());
columns.addAll(HiveTableCreator.getPartitionKVsForHiveTable());
- return generateKylinTable(sinkTool, kylinConfig.getKylinMetricsSubjectJob(), columns);
+ return generateKylinTable(kylinConfig, sinkTool, kylinConfig.getKylinMetricsSubjectJob(), columns);
}
public static TableDesc generateKylinTableForMetricsJobException(KylinConfig kylinConfig, SinkTool sinkTool) {
List<Pair<String, String>> columns = Lists.newLinkedList();
columns.addAll(HiveTableCreator.getHiveColumnsForMetricsJobException());
columns.addAll(HiveTableCreator.getPartitionKVsForHiveTable());
- return generateKylinTable(sinkTool, kylinConfig.getKylinMetricsSubjectJobException(), columns);
+ return generateKylinTable(kylinConfig, sinkTool, kylinConfig.getKylinMetricsSubjectJobException(), columns);
}
- public static TableDesc generateKylinTable(SinkTool sinkTool, String subject, List<Pair<String, String>> columns) {
+ public static TableDesc generateKylinTable(KylinConfig kylinConfig, SinkTool sinkTool, String subject,
+ List<Pair<String, String>> columns) {
TableDesc kylinTable = new TableDesc();
Pair<String, String> tableNameSplits = ActiveReservoirReporter
@@ -107,7 +108,7 @@ public class KylinTableCreator {
}
kylinTable.setColumns(columnDescs);
- kylinTable.init(MetricsManager.SYSTEM_PROJECT);
+ kylinTable.init(kylinConfig, MetricsManager.SYSTEM_PROJECT);
return kylinTable;
}
--
To stop receiving notification emails like this one, please contact
liyang@apache.org.