You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2018/04/03 03:15:32 UTC

[kylin] 07/08: KYLIN-3315 allow each project to set its own source

This is an automated email from the ASF dual-hosted git repository.

liyang pushed a commit to branch sync
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 54bdae63cfca137c02cdf8749f93d152a9ac2010
Author: lidongsjtu <li...@apache.org>
AuthorDate: Tue Apr 3 11:11:36 2018 +0800

    KYLIN-3315 allow each project to set its own source
---
 .../test/java/org/apache/kylin/job/DeployUtil.java |   4 +-
 .../java/org/apache/kylin/cube/CubeManager.java    |   4 +-
 .../org/apache/kylin/dict/lookup/SnapshotCLI.java  |   4 +-
 .../kylin/metadata/TableMetadataManager.java       |   4 +-
 .../kylin/metadata/model/ExternalFilterDesc.java   |  21 ++-
 .../apache/kylin/metadata/model/ISourceAware.java  |   4 +
 .../org/apache/kylin/metadata/model/TableDesc.java |  13 +-
 .../kylin/metadata/project/ProjectInstance.java    |   7 +-
 .../main/java/org/apache/kylin/source/ISource.java |   8 +-
 .../org/apache/kylin/source/SourceFactory.java     |  62 ---------
 .../org/apache/kylin/source/SourceManager.java     | 154 +++++++++++++++++++++
 .../java/org/apache/kylin/engine/mr/MRUtil.java    |  10 +-
 .../kylin/engine/mr/common/JobRelatedMetaUtil.java |   4 +-
 .../kylin/provision/BuildCubeWithStream.java       |   4 +-
 .../org/apache/kylin/source/SourceManagerTest.java |  61 ++++++++
 .../source/hive/ITHiveSourceTableLoaderTest.java   |   4 +-
 .../kylin/source/hive/ITSnapshotManagerTest.java   |   4 +-
 .../source/jdbc/ITJdbcSourceTableLoaderTest.java   |   9 +-
 .../kylin/source/jdbc/ITJdbcTableReaderTest.java   |   5 +
 .../kylin/query/schema/OLAPSchemaFactory.java      |  25 ++--
 .../org/apache/kylin/query/util/PushDownUtil.java  |   3 +-
 .../kylin/rest/controller/TableController.java     |   8 +-
 .../apache/kylin/rest/job/StorageCleanupJob.java   |   4 +-
 .../org/apache/kylin/rest/service/JobService.java  |   4 +-
 .../apache/kylin/rest/service/TableService.java    |  55 ++++----
 .../org/apache/kylin/source/hive/HiveSource.java   |  12 +-
 .../org/apache/kylin/source/jdbc/JdbcSource.java   |  11 +-
 .../org/apache/kylin/source/kafka/KafkaSource.java |  47 +++++--
 .../tool/metrics/systemcube/KylinTableCreator.java |  15 +-
 29 files changed, 401 insertions(+), 169 deletions(-)

diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
index a418dc9..524c2e4 100644
--- a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
+++ b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
@@ -47,7 +47,7 @@ import org.apache.kylin.metadata.model.DataModelManager;
 import org.apache.kylin.metadata.model.TableRef;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.source.ISampleDataDeployer;
-import org.apache.kylin.source.SourceFactory;
+import org.apache.kylin.source.SourceManager;
 import org.apache.kylin.source.datagen.ModelDataGenerator;
 import org.apache.kylin.source.kafka.TimedJsonStreamParser;
 import org.apache.maven.model.Model;
@@ -231,7 +231,7 @@ public class DeployUtil {
         }
         tempDir.deleteOnExit();
 
-        ISampleDataDeployer sampleDataDeployer = SourceFactory.getSource(model.getRootFactTable().getTableDesc())
+        ISampleDataDeployer sampleDataDeployer = SourceManager.getSource(model.getRootFactTable().getTableDesc())
                 .getSampleDataDeployer();
         
         // create hive tables
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index b8deadb..fc2ad3d 100755
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -69,7 +69,7 @@ import org.apache.kylin.metadata.realization.IRealizationProvider;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
 import org.apache.kylin.metadata.realization.RealizationType;
 import org.apache.kylin.source.IReadableTable;
-import org.apache.kylin.source.SourceFactory;
+import org.apache.kylin.source.SourceManager;
 import org.apache.kylin.source.SourcePartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -1052,7 +1052,7 @@ public class CubeManager implements IRealizationProvider {
             SnapshotManager snapshotMgr = getSnapshotManager();
 
             TableDesc tableDesc = new TableDesc(metaMgr.getTableDesc(lookupTable, segCopy.getProject()));
-            IReadableTable hiveTable = SourceFactory.createReadableTable(tableDesc);
+            IReadableTable hiveTable = SourceManager.createReadableTable(tableDesc);
             SnapshotTable snapshot = snapshotMgr.buildSnapshot(hiveTable, tableDesc);
 
             segCopy.putSnapshotResPath(lookupTable, snapshot.getResourcePath());
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotCLI.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotCLI.java
index 2093d23..f965d18 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotCLI.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotCLI.java
@@ -23,7 +23,7 @@ import java.io.IOException;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.metadata.TableMetadataManager;
 import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.source.SourceFactory;
+import org.apache.kylin.source.SourceManager;
 
 public class SnapshotCLI {
 
@@ -42,7 +42,7 @@ public class SnapshotCLI {
         if (tableDesc == null)
             throw new IllegalArgumentException("Not table found by " + table);
 
-        SnapshotTable snapshot = snapshotMgr.rebuildSnapshot(SourceFactory.createReadableTable(tableDesc), tableDesc, overwriteUUID);
+        SnapshotTable snapshot = snapshotMgr.rebuildSnapshot(SourceManager.createReadableTable(tableDesc), tableDesc, overwriteUUID);
         System.out.println("resource path updated: " + snapshot.getResourcePath());
     }
 }
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/TableMetadataManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/TableMetadataManager.java
index 42233b7..116e210 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/TableMetadataManager.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/TableMetadataManager.java
@@ -118,7 +118,7 @@ public class TableMetadataManager {
             @Override
             protected TableDesc initEntityAfterReload(TableDesc t, String resourceName) {
                 String prj = TableDesc.parseResourcePath(resourceName).getSecond();
-                t.init(prj);
+                t.init(config, prj);
                 return t;
             }
         };
@@ -237,7 +237,7 @@ public class TableMetadataManager {
 
     public void saveSourceTable(TableDesc srcTable, String prj) throws IOException {
         try (AutoLock lock = srcTableMapLock.lockForWrite()) {
-            srcTable.init(prj);
+            srcTable.init(config, prj);
             srcTableCrud.save(srcTable);
         }
     }
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ExternalFilterDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ExternalFilterDesc.java
index 35018c7..7ef84aa 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ExternalFilterDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ExternalFilterDesc.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.metadata.model;
 
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.RootPersistentEntity;
 import org.apache.kylin.metadata.filter.function.Functions;
@@ -62,7 +63,7 @@ public class ExternalFilterDesc extends RootPersistentEntity implements ISourceA
     public String resourceName() {
         return name;
     }
-    
+
     public String getFilterResourceIdentifier() {
         return filterResourceIdentifier;
     }
@@ -94,7 +95,8 @@ public class ExternalFilterDesc extends RootPersistentEntity implements ISourceA
 
     @Override
     public String toString() {
-        return "ExternalFilterDesc [ name=" + name + " filter table resource identifier " + this.filterResourceIdentifier + "]";
+        return "ExternalFilterDesc [ name=" + name + " filter table resource identifier "
+                + this.filterResourceIdentifier + "]";
     }
 
     /** create a mockup table for unit test */
@@ -104,11 +106,6 @@ public class ExternalFilterDesc extends RootPersistentEntity implements ISourceA
         return mockup;
     }
 
-    @Override
-    public int getSourceType() {
-        return sourceType;
-    }
-
     public void setSourceType(int sourceType) {
         this.sourceType = sourceType;
     }
@@ -120,4 +117,14 @@ public class ExternalFilterDesc extends RootPersistentEntity implements ISourceA
     public void setDescription(String description) {
         this.description = description;
     }
+
+    @Override
+    public int getSourceType() {
+        return sourceType;
+    }
+
+    @Override
+    public KylinConfig getConfig() {
+        return null;
+    }
 }
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java
index 7ab1bca..eab3e2c 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java
@@ -18,6 +18,8 @@
 
 package org.apache.kylin.metadata.model;
 
+import org.apache.kylin.common.KylinConfig;
+
 public interface ISourceAware {
 
     public static final int ID_HIVE = 0;
@@ -27,4 +29,6 @@ public interface ISourceAware {
     public static final int ID_JDBC = 8;
 
     int getSourceType();
+
+    KylinConfig getConfig();
 }
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
index be278de..a9e9877 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
@@ -22,6 +22,7 @@ import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
 
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.RootPersistentEntity;
 import org.apache.kylin.common.util.Pair;
@@ -98,6 +99,7 @@ public class TableDesc extends RootPersistentEntity implements ISourceAware {
     private String dataGen;
 
     private String project;
+    private KylinConfig config;
     private DatabaseDesc database = new DatabaseDesc();
     private String identity = null;
     private boolean isBorrowedFromGlobal = false;
@@ -121,6 +123,7 @@ public class TableDesc extends RootPersistentEntity implements ISourceAware {
         }
 
         this.project = other.project;
+        this.config = other.config;
         this.database.setName(other.getDatabase());
         this.identity = other.identity;
     }
@@ -287,9 +290,10 @@ public class TableDesc extends RootPersistentEntity implements ISourceAware {
         return dataGen;
     }
 
-    public void init(String project) {
+    public void init(KylinConfig config, String project) {
         this.project = project;
-
+        this.config = config;
+        
         if (name != null)
             name = name.toUpperCase();
 
@@ -372,6 +376,11 @@ public class TableDesc extends RootPersistentEntity implements ISourceAware {
         return sourceType;
     }
 
+    @Override
+    public KylinConfig getConfig() {
+        return config;
+    }
+
     public void setSourceType(int sourceType) {
         this.sourceType = sourceType;
     }
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
index 9b7aaf2..45622f3 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
@@ -31,6 +31,7 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.KylinConfigExt;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.RootPersistentEntity;
+import org.apache.kylin.metadata.model.ISourceAware;
 import org.apache.kylin.metadata.realization.RealizationType;
 
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
@@ -47,7 +48,7 @@ import com.google.common.collect.Lists;
  */
 @SuppressWarnings("serial")
 @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
-public class ProjectInstance extends RootPersistentEntity {
+public class ProjectInstance extends RootPersistentEntity implements ISourceAware {
 
     public static final String DEFAULT_PROJECT_NAME = "default";
 
@@ -338,4 +339,8 @@ public class ProjectInstance extends RootPersistentEntity {
         return "ProjectDesc [name=" + name + "]";
     }
 
+    @Override
+    public int getSourceType() {
+        return getConfig().getDefaultSource();
+    }
 }
diff --git a/core-metadata/src/main/java/org/apache/kylin/source/ISource.java b/core-metadata/src/main/java/org/apache/kylin/source/ISource.java
index 42548ae..2c5a922 100644
--- a/core-metadata/src/main/java/org/apache/kylin/source/ISource.java
+++ b/core-metadata/src/main/java/org/apache/kylin/source/ISource.java
@@ -18,13 +18,15 @@
 
 package org.apache.kylin.source;
 
+import java.io.Closeable;
+
 import org.apache.kylin.metadata.model.IBuildable;
 import org.apache.kylin.metadata.model.TableDesc;
 
 /**
  * Represents a kind of source to Kylin, like Hive.
  */
-public interface ISource {
+public interface ISource extends Closeable {
 
     /** 
      * Return an explorer to sync table metadata from the data source.
@@ -41,13 +43,13 @@ public interface ISource {
      * Return a ReadableTable that can iterate through the rows of given table.
      */
     IReadableTable createReadableTable(TableDesc tableDesc);
-    
+
     /**
      * Give the source a chance to enrich a SourcePartition before build start.
      * Particularly, Kafka source use this chance to define start/end offsets within each partition.
      */
     SourcePartition enrichSourcePartitionBeforeBuild(IBuildable buildable, SourcePartition srcPartition);
-    
+
     /**
      * Return an object that is responsible for deploying sample (CSV) data to the source database.
      * For testing purpose.
diff --git a/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java b/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java
deleted file mode 100644
index 365b505..0000000
--- a/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.source;
-
-import java.util.List;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.ImplementationSwitch;
-import org.apache.kylin.metadata.model.ISourceAware;
-import org.apache.kylin.metadata.model.TableDesc;
-
-public class SourceFactory {
-
-    // Use thread-local because KylinConfig can be thread-local and implementation might be different among multiple threads.
-    private static ThreadLocal<ImplementationSwitch<ISource>> sources = new ThreadLocal<>();
-
-    private static ISource getSource(int sourceType) {
-        ImplementationSwitch<ISource> current = sources.get();
-        if (current == null) {
-            current = new ImplementationSwitch<>(KylinConfig.getInstanceFromEnv().getSourceEngines(), ISource.class);
-            sources.set(current);
-        }
-        return current.get(sourceType);
-    }
-
-    public static ISource getDefaultSource() {
-        return getSource(KylinConfig.getInstanceFromEnv().getDefaultSource());
-    }
-
-    public static ISource getSource(ISourceAware aware) {
-        return getSource(aware.getSourceType());
-    }
-
-    public static IReadableTable createReadableTable(TableDesc table) {
-        return getSource(table).createReadableTable(table);
-    }
-
-    public static <T> T createEngineAdapter(ISourceAware table, Class<T> engineInterface) {
-        return getSource(table).adaptToBuildEngine(engineInterface);
-    }
-
-    public static List<String> getMRDependentResources(TableDesc table) {
-        return getSource(table).getSourceMetadataExplorer().getRelatedKylinResources(table);
-    }
-
-}
diff --git a/core-metadata/src/main/java/org/apache/kylin/source/SourceManager.java b/core-metadata/src/main/java/org/apache/kylin/source/SourceManager.java
new file mode 100644
index 0000000..62c4368
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/source/SourceManager.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.source;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.metadata.model.ISourceAware;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.metadata.project.ProjectManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+
+public class SourceManager {
+    private static final Logger logger = LoggerFactory.getLogger(SourceManager.class);
+
+    private final KylinConfig systemConfig;
+    private final Cache<String, ISource> sourceMap;
+
+    public static SourceManager getInstance(KylinConfig config) {
+        return config.getManager(SourceManager.class);
+    }
+
+    // called by reflection
+    static SourceManager newInstance(KylinConfig config) throws IOException {
+        return new SourceManager(config);
+    }
+
+    // ============================================
+
+    private SourceManager(KylinConfig config) {
+        this.systemConfig = config;
+        this.sourceMap = CacheBuilder.newBuilder().expireAfterWrite(1, TimeUnit.DAYS)
+                .removalListener(new RemovalListener<String, ISource>() {
+                    @Override
+                    public void onRemoval(RemovalNotification<String, ISource> entry) {
+                        ISource s = entry.getValue();
+                        if (s != null) {
+                            try {
+                                s.close();
+                            } catch (Throwable e) {
+                                logger.error("Failed to close ISource: {}", s.getClass().getName(), e);
+                            }
+                        }
+                    }
+                }).build();
+    }
+
+    public ISource getCachedSource(ISourceAware aware) {
+        String key = createSourceCacheKey(aware);
+        ISource source = sourceMap.getIfPresent(key);
+        if (source != null)
+            return source;
+
+        synchronized (this) {
+            source = sourceMap.getIfPresent(key);
+            if (source != null)
+                return source;
+
+            source = createSource(aware);
+            sourceMap.put(key, source);
+            return source;
+        }
+    }
+
+    public ISource getProjectSource(String projectName) {
+        ProjectInstance projectInstance = ProjectManager.getInstance(systemConfig).getProject(projectName);
+        if (projectInstance != null)
+            return getCachedSource(projectInstance);
+        else
+            return getDefaultSource();
+    }
+
+    private String createSourceCacheKey(ISourceAware aware) {
+        StringBuilder builder = new StringBuilder();
+        builder.append(aware.getSourceType()).append('|');
+
+        KylinConfig config = aware.getConfig();
+        builder.append(config.getJdbcSourceConnectionUrl()).append('|');
+        builder.append(config.getJdbcSourceDriver()).append('|');
+        builder.append(config.getJdbcSourceUser()).append('|');
+        builder.append(config.getJdbcSourceFieldDelimiter()).append('|');
+        builder.append(config.getJdbcSourceDialect()).append('|');
+        return builder.toString(); // jdbc password not needed, because url+user should be identical.
+    }
+
+    private ISource createSource(ISourceAware aware) {
+        String sourceClazz = systemConfig.getSourceEngines().get(aware.getSourceType());
+        try {
+            return ClassUtil.forName(sourceClazz, ISource.class).getDeclaredConstructor(KylinConfig.class)
+                    .newInstance(aware.getConfig());
+        } catch (Throwable e) {
+            logger.error("Failed to create source: SourceType={}", aware.getSourceType(), e);
+            return null;
+        }
+    }
+
+    // ==========================================================
+
+    public static ISource getSource(ISourceAware aware) {
+        return getInstance(aware.getConfig()).getCachedSource(aware);
+    }
+
+    public static ISource getDefaultSource() {
+        final KylinConfig sysConfig = KylinConfig.getInstanceFromEnv();
+        return getSource(new ISourceAware() {
+            @Override
+            public int getSourceType() {
+                return sysConfig.getDefaultSource();
+            }
+
+            @Override
+            public KylinConfig getConfig() {
+                return sysConfig;
+            }
+        });
+    }
+
+    public static IReadableTable createReadableTable(TableDesc table) {
+        return getSource(table).createReadableTable(table);
+    }
+
+    public static <T> T createEngineAdapter(ISourceAware table, Class<T> engineInterface) {
+        return getSource(table).adaptToBuildEngine(engineInterface);
+    }
+
+    public static List<String> getMRDependentResources(TableDesc table) {
+        return getSource(table).getSourceMetadataExplorer().getRelatedKylinResources(table);
+    }
+}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
index 124e5e7..3a9d0ed 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
@@ -33,23 +33,23 @@ import org.apache.kylin.engine.mr.IMROutput2.IMRBatchMergeOutputSide2;
 import org.apache.kylin.metadata.TableMetadataManager;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.source.SourceFactory;
+import org.apache.kylin.source.SourceManager;
 import org.apache.kylin.storage.StorageFactory;
 
 public class MRUtil {
 
     public static IMRBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg) {
         IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(seg);
-        return SourceFactory.createEngineAdapter(seg, IMRInput.class).getBatchCubingInputSide(flatDesc);
+        return SourceManager.createEngineAdapter(seg, IMRInput.class).getBatchCubingInputSide(flatDesc);
     }
 
     public static IMRTableInputFormat getTableInputFormat(String tableName, String prj) {
         TableDesc t = getTableDesc(tableName, prj);
-        return SourceFactory.createEngineAdapter(t, IMRInput.class).getTableInputFormat(t);
+        return SourceManager.createEngineAdapter(t, IMRInput.class).getTableInputFormat(t);
     }
 
     public static IMRTableInputFormat getTableInputFormat(TableDesc tableDesc) {
-        return SourceFactory.createEngineAdapter(tableDesc, IMRInput.class).getTableInputFormat(tableDesc);
+        return SourceManager.createEngineAdapter(tableDesc, IMRInput.class).getTableInputFormat(tableDesc);
     }
 
     private static TableDesc getTableDesc(String tableName, String prj) {
@@ -73,7 +73,7 @@ public class MRUtil {
     }
 
     public static IMRInput.IMRBatchMergeInputSide getBatchMergeInputSide(CubeSegment seg) {
-        return SourceFactory.createEngineAdapter(seg, IMRInput.class).getBatchMergeInputSide(seg);
+        return SourceManager.createEngineAdapter(seg, IMRInput.class).getBatchMergeInputSide(seg);
     }
 
     public static IMROutput2.IMRBatchOptimizeOutputSide2 getBatchOptimizeOutputSide2(CubeSegment seg) {
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobRelatedMetaUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobRelatedMetaUtil.java
index c34245b..2cd1841 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobRelatedMetaUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobRelatedMetaUtil.java
@@ -24,7 +24,7 @@ import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TableRef;
-import org.apache.kylin.source.SourceFactory;
+import org.apache.kylin.source.SourceManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,7 +47,7 @@ public class JobRelatedMetaUtil {
         for (TableRef tableRef : cube.getDescriptor().getModel().getAllTables()) {
             TableDesc table = tableRef.getTableDesc();
             dumpList.add(table.getResourcePath());
-            dumpList.addAll(SourceFactory.getMRDependentResources(table));
+            dumpList.addAll(SourceManager.getMRDependentResources(table));
         }
 
         return dumpList;
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index 181e8b9..216ccc1 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -62,7 +62,7 @@ import org.apache.kylin.metadata.streaming.StreamingConfig;
 import org.apache.kylin.metadata.streaming.StreamingManager;
 import org.apache.kylin.rest.job.StorageCleanupJob;
 import org.apache.kylin.source.ISource;
-import org.apache.kylin.source.SourceFactory;
+import org.apache.kylin.source.SourceManager;
 import org.apache.kylin.source.SourcePartition;
 import org.apache.kylin.source.kafka.KafkaConfigManager;
 import org.apache.kylin.source.kafka.config.BrokerConfig;
@@ -282,7 +282,7 @@ public class BuildCubeWithStream {
 
     protected ExecutableState buildSegment(String cubeName, long startOffset, long endOffset) throws Exception {
         CubeInstance cubeInstance = cubeManager.getCube(cubeName);
-        ISource source = SourceFactory.getSource(cubeInstance);
+        ISource source = SourceManager.getSource(cubeInstance);
         SourcePartition partition = source.enrichSourcePartitionBeforeBuild(cubeInstance,
                 new SourcePartition(null, new SegmentRange(startOffset, endOffset), null, null));
         CubeSegment segment = cubeManager.appendSegment(cubeManager.getCube(cubeName), partition);
diff --git a/kylin-it/src/test/java/org/apache/kylin/source/SourceManagerTest.java b/kylin-it/src/test/java/org/apache/kylin/source/SourceManagerTest.java
new file mode 100644
index 0000000..1d7440e
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/source/SourceManagerTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.metadata.model.ISourceAware;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class SourceManagerTest extends LocalFileMetadataTestCase {
+    @BeforeClass
+    public static void beforeClass() {
+        staticCreateTestMetadata();
+    }
+
+    @AfterClass
+    public static void afterClass() throws Exception {
+        cleanAfterClass();
+    }
+
+    @Test
+    public void testGetSource() {
+        final KylinConfig config = getTestConfig();
+        SourceManager sourceManager = SourceManager.getInstance(config);
+        ISource source = sourceManager.getCachedSource(new ISourceAware() {
+            @Override
+            public int getSourceType() {
+                return config.getDefaultSource();
+            }
+
+            @Override
+            public KylinConfig getConfig() {
+                return config;
+            }
+        });
+
+        Assert.assertEquals(config.getSourceEngines().get(config.getDefaultSource()), source.getClass().getName());
+        Assert.assertEquals(source, SourceManager.getDefaultSource());
+        Assert.assertEquals(source, SourceManager.getInstance(getTestConfig()).getProjectSource(null));
+    }
+
+}
diff --git a/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java b/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java
index 8e57bed..a5aea1b 100644
--- a/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java
@@ -26,7 +26,7 @@ import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TableExtDesc;
 import org.apache.kylin.source.ISource;
 import org.apache.kylin.source.ISourceMetadataExplorer;
-import org.apache.kylin.source.SourceFactory;
+import org.apache.kylin.source.SourceManager;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -45,7 +45,7 @@ public class ITHiveSourceTableLoaderTest extends HBaseMetadataTestCase {
 
     @Test
     public void test() throws Exception {
-        ISource source = SourceFactory.getDefaultSource();
+        ISource source = SourceManager.getDefaultSource();
         ISourceMetadataExplorer explr = source.getSourceMetadataExplorer();
         Pair<TableDesc, TableExtDesc> pair;
         
diff --git a/kylin-it/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java b/kylin-it/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java
index 384aa95..031da29 100644
--- a/kylin-it/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java
@@ -28,7 +28,7 @@ import org.apache.kylin.metadata.TableMetadataManager;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.source.IReadableTable;
 import org.apache.kylin.source.IReadableTable.TableReader;
-import org.apache.kylin.source.SourceFactory;
+import org.apache.kylin.source.SourceManager;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -56,7 +56,7 @@ public class ITSnapshotManagerTest extends HBaseMetadataTestCase {
     public void basicTest() throws Exception {
         String tableName = "EDW.TEST_SITES";
         TableDesc tableDesc = TableMetadataManager.getInstance(getTestConfig()).getTableDesc(tableName, "default");
-        IReadableTable hiveTable = SourceFactory.createReadableTable(tableDesc);
+        IReadableTable hiveTable = SourceManager.createReadableTable(tableDesc);
         String snapshotPath = snapshotMgr.buildSnapshot(hiveTable, tableDesc).getResourcePath();
 
         snapshotMgr.wipeoutCache();
diff --git a/kylin-it/src/test/java/org/apache/kylin/source/jdbc/ITJdbcSourceTableLoaderTest.java b/kylin-it/src/test/java/org/apache/kylin/source/jdbc/ITJdbcSourceTableLoaderTest.java
index 3869cb6..557e2e7 100644
--- a/kylin-it/src/test/java/org/apache/kylin/source/jdbc/ITJdbcSourceTableLoaderTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/source/jdbc/ITJdbcSourceTableLoaderTest.java
@@ -35,7 +35,7 @@ import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.query.H2Database;
 import org.apache.kylin.source.ISource;
 import org.apache.kylin.source.ISourceMetadataExplorer;
-import org.apache.kylin.source.SourceFactory;
+import org.apache.kylin.source.SourceManager;
 import org.apache.kylin.source.datagen.ModelDataGenerator;
 import org.junit.After;
 import org.junit.Before;
@@ -94,7 +94,7 @@ public class ITJdbcSourceTableLoaderTest extends LocalFileMetadataTestCase imple
     @Test
     public void test() throws Exception {
 
-        ISource source = SourceFactory.getSource(new ITJdbcSourceTableLoaderTest());
+        ISource source = SourceManager.getSource(new ITJdbcSourceTableLoaderTest());
         ISourceMetadataExplorer explr = source.getSourceMetadataExplorer();
         Pair<TableDesc, TableExtDesc> pair;
 
@@ -111,4 +111,9 @@ public class ITJdbcSourceTableLoaderTest extends LocalFileMetadataTestCase imple
         return ISourceAware.ID_JDBC;
     }
 
+    @Override
+    public KylinConfig getConfig() {
+        return config;
+    }
+
 }
diff --git a/kylin-it/src/test/java/org/apache/kylin/source/jdbc/ITJdbcTableReaderTest.java b/kylin-it/src/test/java/org/apache/kylin/source/jdbc/ITJdbcTableReaderTest.java
index 4a5bfe4..4441178 100644
--- a/kylin-it/src/test/java/org/apache/kylin/source/jdbc/ITJdbcTableReaderTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/source/jdbc/ITJdbcTableReaderTest.java
@@ -106,4 +106,9 @@ public class ITJdbcTableReaderTest extends LocalFileMetadataTestCase implements
         return ISourceAware.ID_JDBC;
     }
 
+    @Override
+    public KylinConfig getConfig() {
+        return getTestConfig();
+    }
+
 }
diff --git a/query/src/main/java/org/apache/kylin/query/schema/OLAPSchemaFactory.java b/query/src/main/java/org/apache/kylin/query/schema/OLAPSchemaFactory.java
index 25baf55..a1935fe 100644
--- a/query/src/main/java/org/apache/kylin/query/schema/OLAPSchemaFactory.java
+++ b/query/src/main/java/org/apache/kylin/query/schema/OLAPSchemaFactory.java
@@ -49,19 +49,22 @@ public class OLAPSchemaFactory implements SchemaFactory {
     @Override
     public Schema create(SchemaPlus parentSchema, String schemaName, Map<String, Object> operand) {
         String project = (String) operand.get(SCHEMA_PROJECT);
-        Schema newSchema = new OLAPSchema(project, schemaName, exposeMore());
+        Schema newSchema = new OLAPSchema(project, schemaName, exposeMore(project));
         return newSchema;
     }
 
     private static Map<String, File> cachedJsons = Maps.newConcurrentMap();
 
-    public static boolean exposeMore() {
-        return KylinConfig.getInstanceFromEnv().isPushDownEnabled();
+    public static boolean exposeMore(String project) {
+        return ProjectManager.getInstance(KylinConfig.getInstanceFromEnv()).getProject(project).getConfig()
+                .isPushDownEnabled();
     }
 
     public static File createTempOLAPJson(String project, KylinConfig config) {
 
-        Collection<TableDesc> tables = ProjectManager.getInstance(config).listExposedTables(project, exposeMore());
+        ProjectManager projectManager = ProjectManager.getInstance(config);
+        KylinConfig projConfig = projectManager.getProject(project).getConfig();
+        Collection<TableDesc> tables = projectManager.listExposedTables(project, exposeMore(project));
 
         // "database" in TableDesc correspond to our schema
         // the logic to decide which schema to be "default" in calcite:
@@ -92,17 +95,16 @@ public class OLAPSchemaFactory implements SchemaFactory {
 
             int counter = 0;
 
-
-
+            String schemaFactory = projConfig.getSchemaFactory();
             for (String schemaName : schemaCounts.keySet()) {
                 out.append("        {\n");
                 out.append("            \"type\": \"custom\",\n");
                 out.append("            \"name\": \"" + schemaName + "\",\n");
-                out.append("            \"factory\": \"" + KylinConfig.getInstanceFromEnv().getSchemaFactory()+ "\",\n");
+                out.append("            \"factory\": \"" + schemaFactory + "\",\n");
                 out.append("            \"operand\": {\n");
                 out.append("                \"" + SCHEMA_PROJECT + "\": \"" + project + "\"\n");
                 out.append("            },\n");
-                createOLAPSchemaFunctions(out);
+                createOLAPSchemaFunctions(projConfig.getUDFs(), out);
                 out.append("        }\n");
 
                 if (++counter != schemaCounts.size()) {
@@ -132,9 +134,12 @@ public class OLAPSchemaFactory implements SchemaFactory {
         }
     }
 
-    private static void createOLAPSchemaFunctions(StringBuilder out) throws IOException {
+    private static void createOLAPSchemaFunctions(Map<String, String> definedUdfs, StringBuilder out)
+            throws IOException {
         Map<String, String> udfs = Maps.newHashMap();
-        udfs.putAll(KylinConfig.getInstanceFromEnv().getUDFs());
+        if (definedUdfs != null)
+            udfs.putAll(definedUdfs);
+
         for (Entry<String, Class<?>> entry : MeasureTypeFactory.getUDAFs().entrySet()) {
             udfs.put(entry.getKey(), entry.getValue().getName());
         }
diff --git a/query/src/main/java/org/apache/kylin/query/util/PushDownUtil.java b/query/src/main/java/org/apache/kylin/query/util/PushDownUtil.java
index cfe16c0..7c88141 100644
--- a/query/src/main/java/org/apache/kylin/query/util/PushDownUtil.java
+++ b/query/src/main/java/org/apache/kylin/query/util/PushDownUtil.java
@@ -47,6 +47,7 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.metadata.model.tool.CalciteParser;
+import org.apache.kylin.metadata.project.ProjectManager;
 import org.apache.kylin.metadata.querymeta.SelectedColumnMeta;
 import org.apache.kylin.metadata.realization.NoRealizationFoundException;
 import org.apache.kylin.metadata.realization.RoutingIndicatorException;
@@ -74,7 +75,7 @@ public class PushDownUtil {
     private static Pair<List<List<String>>, List<SelectedColumnMeta>> tryPushDownQuery(String project, String sql,
             String defaultSchema, SQLException sqlException, boolean isSelect, boolean isPrepare) throws Exception {
 
-        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        KylinConfig kylinConfig = ProjectManager.getInstance(KylinConfig.getInstanceFromEnv()).getProject(project).getConfig();
 
         if (!kylinConfig.isPushDownEnabled())
             return null;
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
index 35849f0..7ada8cc 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
@@ -182,9 +182,9 @@ public class TableController extends BasicController {
      */
     @RequestMapping(value = "/hive", method = { RequestMethod.GET }, produces = { "application/json" })
     @ResponseBody
-    private List<String> showHiveDatabases() throws IOException {
+    private List<String> showHiveDatabases(@RequestParam(value = "project", required = false) String project) throws IOException {
         try {
-            return tableService.getHiveDbNames();
+            return tableService.getSourceDbNames(project);
         } catch (Throwable e) {
             logger.error(e.getLocalizedMessage(), e);
             throw new InternalErrorException(e.getLocalizedMessage());
@@ -199,9 +199,9 @@ public class TableController extends BasicController {
      */
     @RequestMapping(value = "/hive/{database}", method = { RequestMethod.GET }, produces = { "application/json" })
     @ResponseBody
-    private List<String> showHiveTables(@PathVariable String database) throws IOException {
+    private List<String> showHiveTables(@PathVariable String database, @RequestParam(value = "project", required = false) String project) throws IOException {
         try {
-            return tableService.getHiveTableNames(database);
+            return tableService.getSourceTableNames(project, database);
         } catch (Throwable e) {
             logger.error(e.getLocalizedMessage(), e);
             throw new InternalErrorException(e.getLocalizedMessage());
diff --git a/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java b/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java
index 59bd21f..114050c 100755
--- a/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java
@@ -52,7 +52,7 @@ import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.metadata.MetadataConstants;
 import org.apache.kylin.source.ISourceMetadataExplorer;
-import org.apache.kylin.source.SourceFactory;
+import org.apache.kylin.source.SourceManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -133,7 +133,7 @@ public class StorageCleanupJob extends AbstractApplication {
     }
 
     protected List<String> getHiveTables() throws Exception {
-        ISourceMetadataExplorer explr = SourceFactory.getDefaultSource().getSourceMetadataExplorer();
+        ISourceMetadataExplorer explr = SourceManager.getDefaultSource().getSourceMetadataExplorer();
         return explr.listTables(KylinConfig.getInstanceFromEnv().getHiveDatabaseForIntermediateTable());
     }
 
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index 6afc568..4317ed5 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -61,7 +61,7 @@ import org.apache.kylin.rest.msg.Message;
 import org.apache.kylin.rest.msg.MsgPicker;
 import org.apache.kylin.rest.util.AclEvaluate;
 import org.apache.kylin.source.ISource;
-import org.apache.kylin.source.SourceFactory;
+import org.apache.kylin.source.SourceManager;
 import org.apache.kylin.source.SourcePartition;
 import org.apache.kylin.storage.hbase.util.ZookeeperJobLock;
 import org.slf4j.Logger;
@@ -230,7 +230,7 @@ public class JobService extends BasicService implements InitializingBean {
         CubeSegment newSeg = null;
         try {
             if (buildType == CubeBuildTypeEnum.BUILD) {
-                ISource source = SourceFactory.getSource(cube);
+                ISource source = SourceManager.getSource(cube);
                 SourcePartition src = new SourcePartition(tsRange, segRange, sourcePartitionOffsetStart,
                         sourcePartitionOffsetEnd);
                 src = source.enrichSourcePartitionBeforeBuild(cube, src);
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
index d737a6a..ace1686 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
@@ -29,8 +29,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 
-import javax.annotation.Nullable;
-
 import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.Pair;
@@ -40,11 +38,11 @@ import org.apache.kylin.engine.mr.common.MapReduceExecutable;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.metadata.MetadataConstants;
 import org.apache.kylin.metadata.TableMetadataManager;
 import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TableExtDesc;
+import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.streaming.StreamingConfig;
 import org.apache.kylin.rest.exception.BadRequestException;
 import org.apache.kylin.rest.msg.Message;
@@ -52,7 +50,7 @@ import org.apache.kylin.rest.msg.MsgPicker;
 import org.apache.kylin.rest.response.TableDescResponse;
 import org.apache.kylin.rest.util.AclEvaluate;
 import org.apache.kylin.source.ISourceMetadataExplorer;
-import org.apache.kylin.source.SourceFactory;
+import org.apache.kylin.source.SourceManager;
 import org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityJob;
 import org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityUpdateJob;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
@@ -64,8 +62,6 @@ import org.springframework.security.core.context.SecurityContextHolder;
 import org.springframework.stereotype.Component;
 
 import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterables;
 import com.google.common.collect.LinkedHashMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.SetMultimap;
@@ -136,7 +132,7 @@ public class TableService extends BasicService {
         for (Pair<TableDesc, TableExtDesc> pair : allMeta) {
             TableDesc tableDesc = pair.getFirst();
             TableExtDesc extDesc = pair.getSecond();
-            
+
             TableDesc origTable = metaMgr.getTableDesc(tableDesc.getIdentity(), project);
             if (origTable == null || origTable.getProject() == null) {
                 tableDesc.setUuid(UUID.randomUUID().toString());
@@ -157,7 +153,7 @@ public class TableService extends BasicService {
             }
             extDesc.init(project);
             metaMgr.saveTableExt(extDesc, project);
-            
+
             saved.add(tableDesc.getIdentity());
         }
 
@@ -176,14 +172,15 @@ public class TableService extends BasicService {
 
         // load all tables first
         List<Pair<TableDesc, TableExtDesc>> allMeta = Lists.newArrayList();
-        ISourceMetadataExplorer explr = SourceFactory.getDefaultSource().getSourceMetadataExplorer();
+        ProjectInstance projectInstance = getProjectManager().getProject(project);
+        ISourceMetadataExplorer explr = SourceManager.getSource(projectInstance).getSourceMetadataExplorer();
         for (Map.Entry<String, String> entry : db2tables.entries()) {
             Pair<TableDesc, TableExtDesc> pair = explr.loadTableMetadata(entry.getKey(), entry.getValue(), project);
             TableDesc tableDesc = pair.getFirst();
             Preconditions.checkState(tableDesc.getDatabase().equals(entry.getKey().toUpperCase()));
             Preconditions.checkState(tableDesc.getName().equals(entry.getValue().toUpperCase()));
-            Preconditions.checkState(tableDesc.getIdentity().equals(entry.getKey().toUpperCase() + "." + entry
-                .getValue().toUpperCase()));
+            Preconditions.checkState(tableDesc.getIdentity()
+                    .equals(entry.getKey().toUpperCase() + "." + entry.getValue().toUpperCase()));
             TableExtDesc extDesc = pair.getSecond();
             Preconditions.checkState(tableDesc.getIdentity().equals(extDesc.getIdentity()));
             allMeta.add(pair);
@@ -191,7 +188,8 @@ public class TableService extends BasicService {
         return allMeta;
     }
 
-    public Map<String, String[]> loadHiveTables(String[] tableNames, String project, boolean isNeedProfile) throws Exception {
+    public Map<String, String[]> loadHiveTables(String[] tableNames, String project, boolean isNeedProfile)
+            throws Exception {
         aclEvaluate.checkProjectAdminPermission(project);
         String submitter = SecurityContextHolder.getContext().getAuthentication().getName();
         Map<String, String[]> result = new HashMap<String, String[]>();
@@ -258,13 +256,14 @@ public class TableService extends BasicService {
 
         tableName = normalizeHiveTableName(tableName);
         TableDesc desc = getTableManager().getTableDesc(tableName, project);
-        
+
         // unload of legacy global table is not supported for now
         if (desc == null || desc.getProject() == null) {
-            logger.warn("Unload Table {} in Project {} failed, could not find TableDesc or related Project", tableName, project);
+            logger.warn("Unload Table {} in Project {} failed, could not find TableDesc or related Project", tableName,
+                    project);
             return false;
         }
-        
+
         tableType = desc.getSourceType();
 
         if (!modelService.isTableInModel(desc, project)) {
@@ -274,7 +273,7 @@ public class TableService extends BasicService {
             List<String> models = modelService.getModelsUsingTable(desc, project);
             throw new BadRequestException(String.format(msg.getTABLE_IN_USE_BY_MODEL(), models));
         }
-        
+
         // it is a project local table, ready to remove since no model is using it within the project
         TableMetadataManager metaMgr = getTableManager();
         metaMgr.removeTableExt(tableName, project);
@@ -313,30 +312,27 @@ public class TableService extends BasicService {
 
     /**
      *
+     * @param project
      * @return
      * @throws Exception
      */
-    public List<String> getHiveDbNames() throws Exception {
-        ISourceMetadataExplorer explr = SourceFactory.getDefaultSource().getSourceMetadataExplorer();
+    public List<String> getSourceDbNames(String project) throws Exception {
+        ISourceMetadataExplorer explr = SourceManager.getInstance(getConfig()).getProjectSource(project)
+                .getSourceMetadataExplorer();
         return explr.listDatabases();
     }
 
     /**
      *
+     * @param project
      * @param database
      * @return
      * @throws Exception
      */
-    public List<String> getHiveTableNames(String database) throws Exception {
-        ISourceMetadataExplorer explr = SourceFactory.getDefaultSource().getSourceMetadataExplorer();
-        List<String> hiveTableNames = explr.listTables(database);
-        Iterable<String> kylinApplicationTableNames = Iterables.filter(hiveTableNames, new Predicate<String>() {
-            @Override
-            public boolean apply(@Nullable String input) {
-                return input != null && !input.startsWith(MetadataConstants.KYLIN_INTERMEDIATE_PREFIX);
-            }
-        });
-        return Lists.newArrayList(kylinApplicationTableNames);
+    public List<String> getSourceTableNames(String project, String database) throws Exception {
+        ISourceMetadataExplorer explr = SourceManager.getInstance(getConfig()).getProjectSource(project)
+                .getSourceMetadataExplorer();
+        return explr.listTables(database);
     }
 
     private TableDescResponse cloneTableDesc(TableDesc table, String prj) {
@@ -355,7 +351,8 @@ public class TableService extends BasicService {
                 if (cards.length > i) {
                     cardinality.put(columnDesc.getName(), Long.parseLong(cards[i]));
                 } else {
-                    logger.error("The result cardinality is not identical with hive table metadata, cardinality : " + scard + " column array length: " + cdescs.length);
+                    logger.error("The result cardinality is not identical with hive table metadata, cardinality : "
+                            + scard + " column array length: " + cdescs.length);
                     break;
                 }
             }
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java
index 129098c..58bd2c3 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java
@@ -18,6 +18,8 @@
 
 package org.apache.kylin.source.hive;
 
+import java.io.IOException;
+
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.engine.mr.IMRInput;
 import org.apache.kylin.metadata.model.IBuildable;
@@ -28,8 +30,10 @@ import org.apache.kylin.source.ISource;
 import org.apache.kylin.source.ISourceMetadataExplorer;
 import org.apache.kylin.source.SourcePartition;
 
-//used by reflection
 public class HiveSource implements ISource {
+    //used by reflection
+    public HiveSource(KylinConfig config) {
+    }
 
     @Override
     public ISourceMetadataExplorer getSourceMetadataExplorer() {
@@ -53,7 +57,7 @@ public class HiveSource implements ISource {
         if (tableDesc.isView()) {
             KylinConfig config = KylinConfig.getInstanceFromEnv();
             String tableName = tableDesc.getMaterializedName();
-            
+
             tableDesc = new TableDesc();
             tableDesc.setDatabase(config.getHiveDatabaseForIntermediateTable());
             tableDesc.setName(tableName);
@@ -75,4 +79,8 @@ public class HiveSource implements ISource {
         return new HiveMetadataExplorer();
     }
 
+    @Override
+    public void close() throws IOException {
+        // not needed
+    }
 }
diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java
index 5e06f90..ae3bbc5 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java
@@ -18,6 +18,9 @@
 
 package org.apache.kylin.source.jdbc;
 
+import java.io.IOException;
+
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.engine.mr.IMRInput;
 import org.apache.kylin.metadata.model.IBuildable;
 import org.apache.kylin.metadata.model.TableDesc;
@@ -27,8 +30,10 @@ import org.apache.kylin.source.ISource;
 import org.apache.kylin.source.ISourceMetadataExplorer;
 import org.apache.kylin.source.SourcePartition;
 
-//used by reflection
 public class JdbcSource implements ISource {
+    //used by reflection
+    public JdbcSource(KylinConfig config) {
+    }
 
     @Override
     public ISourceMetadataExplorer getSourceMetadataExplorer() {
@@ -62,4 +67,8 @@ public class JdbcSource implements ISource {
         return new JdbcExplorer();
     }
 
+    @Override
+    public void close() throws IOException {
+        // not needed
+    }
 }
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
index 1142243..0ab83c6 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.source.kafka;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 
@@ -46,11 +47,14 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Lists;
 
-//used by reflection
 public class KafkaSource implements ISource {
 
     private static final Logger logger = LoggerFactory.getLogger(KafkaSource.class);
 
+    //used by reflection
+    public KafkaSource(KylinConfig config) {
+    }
+
     @SuppressWarnings("unchecked")
     @Override
     public <I> I adaptToBuildEngine(Class<I> engineInterface) {
@@ -75,20 +79,25 @@ public class KafkaSource implements ISource {
         if (range == null || range.start.v.equals(0L)) {
             final CubeSegment last = cube.getLastSegment();
             if (last != null) {
-                logger.debug("Last segment exists, continue from last segment " + last.getName() + "'s end position: " + last.getSourcePartitionOffsetEnd());
+                logger.debug("Last segment exists, continue from last segment " + last.getName() + "'s end position: "
+                        + last.getSourcePartitionOffsetEnd());
                 // from last seg's end position
                 result.setSourcePartitionOffsetStart(last.getSourcePartitionOffsetEnd());
-            } else if (cube.getDescriptor().getPartitionOffsetStart() != null && cube.getDescriptor().getPartitionOffsetStart().size() > 0) {
-                logger.debug("Last segment doesn't exist, use the start offset that be initiated previously: " + cube.getDescriptor().getPartitionOffsetStart());
+            } else if (cube.getDescriptor().getPartitionOffsetStart() != null
+                    && cube.getDescriptor().getPartitionOffsetStart().size() > 0) {
+                logger.debug("Last segment doesn't exist, use the start offset that be initiated previously: "
+                        + cube.getDescriptor().getPartitionOffsetStart());
                 result.setSourcePartitionOffsetStart(cube.getDescriptor().getPartitionOffsetStart());
             } else {
                 // from the topic's earliest offset;
-                logger.debug("Last segment doesn't exist, and didn't initiate the start offset, will seek from topic's earliest offset.");
+                logger.debug(
+                        "Last segment doesn't exist, and didn't initiate the start offset, will seek from topic's earliest offset.");
                 result.setSourcePartitionOffsetStart(KafkaClient.getEarliestOffsets(cube));
             }
         }
 
-        final KafkaConfig kafkaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(cube.getRootFactTable());
+        final KafkaConfig kafkaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv())
+                .getKafkaConfig(cube.getRootFactTable());
         final String brokers = KafkaClient.getKafkaBrokers(kafkaConfig);
         final String topic = kafkaConfig.getTopic();
         try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cube.getName(), null)) {
@@ -111,7 +120,9 @@ public class KafkaSource implements ISource {
             for (Integer partitionId : latestOffsets.keySet()) {
                 if (result.getSourcePartitionOffsetStart().containsKey(partitionId)) {
                     if (result.getSourcePartitionOffsetStart().get(partitionId) > latestOffsets.get(partitionId)) {
-                        throw new IllegalArgumentException("Partition " + partitionId + " end offset (" + latestOffsets.get(partitionId) + ") is smaller than start offset ( " + result.getSourcePartitionOffsetStart().get(partitionId) + ")");
+                        throw new IllegalArgumentException("Partition " + partitionId + " end offset ("
+                                + latestOffsets.get(partitionId) + ") is smaller than start offset ( "
+                                + result.getSourcePartitionOffsetStart().get(partitionId) + ")");
                     }
                 } else {
                     throw new IllegalStateException("New partition added in between, retry.");
@@ -129,7 +140,8 @@ public class KafkaSource implements ISource {
         }
 
         if (totalStartOffset > totalEndOffset) {
-            throw new IllegalArgumentException("Illegal offset: start: " + totalStartOffset + ", end: " + totalEndOffset);
+            throw new IllegalArgumentException(
+                    "Illegal offset: start: " + totalStartOffset + ", end: " + totalEndOffset);
         }
 
         if (totalStartOffset == totalEndOffset) {
@@ -155,7 +167,8 @@ public class KafkaSource implements ISource {
 
         if (startOffset > 0) {
             if (sourcePartitionOffsetStart == null || sourcePartitionOffsetStart.size() == 0) {
-                throw new IllegalArgumentException("When 'startOffset' is > 0, need provide each partition's start offset");
+                throw new IllegalArgumentException(
+                        "When 'startOffset' is > 0, need provide each partition's start offset");
             }
 
             long totalOffset = 0;
@@ -164,13 +177,15 @@ public class KafkaSource implements ISource {
             }
 
             if (totalOffset != startOffset) {
-                throw new IllegalArgumentException("Invalid 'sourcePartitionOffsetStart', doesn't match with 'startOffset'");
+                throw new IllegalArgumentException(
+                        "Invalid 'sourcePartitionOffsetStart', doesn't match with 'startOffset'");
             }
         }
 
         if (endOffset > 0 && endOffset != Long.MAX_VALUE) {
             if (sourcePartitionOffsetEnd == null || sourcePartitionOffsetEnd.size() == 0) {
-                throw new IllegalArgumentException("When 'endOffset' is not Long.MAX_VALUE, need provide each partition's start offset");
+                throw new IllegalArgumentException(
+                        "When 'endOffset' is not Long.MAX_VALUE, need provide each partition's start offset");
             }
 
             long totalOffset = 0;
@@ -179,7 +194,8 @@ public class KafkaSource implements ISource {
             }
 
             if (totalOffset != endOffset) {
-                throw new IllegalArgumentException("Invalid 'sourcePartitionOffsetEnd', doesn't match with 'endOffset'");
+                throw new IllegalArgumentException(
+                        "Invalid 'sourcePartitionOffsetEnd', doesn't match with 'endOffset'");
             }
         }
     }
@@ -199,7 +215,8 @@ public class KafkaSource implements ISource {
             }
 
             @Override
-            public Pair<TableDesc, TableExtDesc> loadTableMetadata(String database, String table, String prj) throws Exception {
+            public Pair<TableDesc, TableExtDesc> loadTableMetadata(String database, String table, String prj)
+                    throws Exception {
                 throw new UnsupportedOperationException();
             }
 
@@ -223,4 +240,8 @@ public class KafkaSource implements ISource {
         throw new UnsupportedOperationException();
     }
 
+    @Override
+    public void close() throws IOException {
+        // not needed
+    }
 }
diff --git a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/KylinTableCreator.java b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/KylinTableCreator.java
index 8aac466..a2a0616 100644
--- a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/KylinTableCreator.java
+++ b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/KylinTableCreator.java
@@ -54,38 +54,39 @@ public class KylinTableCreator {
         List<Pair<String, String>> columns = Lists.newLinkedList();
         columns.addAll(HiveTableCreator.getHiveColumnsForMetricsQuery());
         columns.addAll(HiveTableCreator.getPartitionKVsForHiveTable());
-        return generateKylinTable(sinkTool, kylinConfig.getKylinMetricsSubjectQuery(), columns);
+        return generateKylinTable(kylinConfig, sinkTool, kylinConfig.getKylinMetricsSubjectQuery(), columns);
     }
 
     public static TableDesc generateKylinTableForMetricsQueryCube(KylinConfig kylinConfig, SinkTool sinkTool) {
         List<Pair<String, String>> columns = Lists.newLinkedList();
         columns.addAll(HiveTableCreator.getHiveColumnsForMetricsQueryCube());
         columns.addAll(HiveTableCreator.getPartitionKVsForHiveTable());
-        return generateKylinTable(sinkTool, kylinConfig.getKylinMetricsSubjectQueryCube(), columns);
+        return generateKylinTable(kylinConfig, sinkTool, kylinConfig.getKylinMetricsSubjectQueryCube(), columns);
     }
 
     public static TableDesc generateKylinTableForMetricsQueryRPC(KylinConfig kylinConfig, SinkTool sinkTool) {
         List<Pair<String, String>> columns = Lists.newLinkedList();
         columns.addAll(HiveTableCreator.getHiveColumnsForMetricsQueryRPC());
         columns.addAll(HiveTableCreator.getPartitionKVsForHiveTable());
-        return generateKylinTable(sinkTool, kylinConfig.getKylinMetricsSubjectQueryRpcCall(), columns);
+        return generateKylinTable(kylinConfig, sinkTool, kylinConfig.getKylinMetricsSubjectQueryRpcCall(), columns);
     }
 
     public static TableDesc generateKylinTableForMetricsJob(KylinConfig kylinConfig, SinkTool sinkTool) {
         List<Pair<String, String>> columns = Lists.newLinkedList();
         columns.addAll(HiveTableCreator.getHiveColumnsForMetricsJob());
         columns.addAll(HiveTableCreator.getPartitionKVsForHiveTable());
-        return generateKylinTable(sinkTool, kylinConfig.getKylinMetricsSubjectJob(), columns);
+        return generateKylinTable(kylinConfig, sinkTool, kylinConfig.getKylinMetricsSubjectJob(), columns);
     }
 
     public static TableDesc generateKylinTableForMetricsJobException(KylinConfig kylinConfig, SinkTool sinkTool) {
         List<Pair<String, String>> columns = Lists.newLinkedList();
         columns.addAll(HiveTableCreator.getHiveColumnsForMetricsJobException());
         columns.addAll(HiveTableCreator.getPartitionKVsForHiveTable());
-        return generateKylinTable(sinkTool, kylinConfig.getKylinMetricsSubjectJobException(), columns);
+        return generateKylinTable(kylinConfig, sinkTool, kylinConfig.getKylinMetricsSubjectJobException(), columns);
     }
 
-    public static TableDesc generateKylinTable(SinkTool sinkTool, String subject, List<Pair<String, String>> columns) {
+    public static TableDesc generateKylinTable(KylinConfig kylinConfig, SinkTool sinkTool, String subject,
+            List<Pair<String, String>> columns) {
         TableDesc kylinTable = new TableDesc();
 
         Pair<String, String> tableNameSplits = ActiveReservoirReporter
@@ -107,7 +108,7 @@ public class KylinTableCreator {
         }
         kylinTable.setColumns(columnDescs);
 
-        kylinTable.init(MetricsManager.SYSTEM_PROJECT);
+        kylinTable.init(kylinConfig, MetricsManager.SYSTEM_PROJECT);
 
         return kylinTable;
     }

-- 
To stop receiving notification emails like this one, please contact
liyang@apache.org.