You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2018/04/03 03:15:25 UTC

[kylin] branch sync updated (f50120a -> f48984b)

This is an automated email from the ASF dual-hosted git repository.

liyang pushed a change to branch sync
in repository https://gitbox.apache.org/repos/asf/kylin.git.


    from f50120a  minor, fix query handling right outer join case
     new 3222de2  KYLIN-3311 refactor Resource.checkAndPutResourceImpl() to throw WriteConflictException
     new 190f466  minor, add null case.
     new 25b4b3b  KYLIN-3277 Kylin should override hiveconf settings when connecting to hive using jdbc
     new 2e58857  KYLIN-3277 Code review
     new 35009d4  KYLIN-3324 fix NegativeArraySizeException in CreateDictionaryJob$2.getDictionary()
     new 4d4915b  minor, add test util SetAndUnsetSystemProp
     new 54bdae6  KYLIN-3315 allow each project to set its own source
     new f48984b  KYLIN-3315 allow each project to set its own source

The 8 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../test/java/org/apache/kylin/job/DeployUtil.java |   4 +-
 .../org/apache/kylin/common/KylinConfigBase.java   |   2 +-
 .../common/persistence/FileResourceStore.java      |   6 +-
 .../common/persistence/HDFSResourceStore.java      |   4 +-
 .../kylin/common/persistence/ResourceStore.java    |  12 +-
 ...eException.java => WriteConflictException.java} |   9 +-
 .../apache/kylin/common/util/HiveCmdBuilder.java   |  54 +-------
 .../kylin/common/util/HiveConfigurationUtil.java   | 100 +++++++++++++
 .../common/persistence/ResourceStoreTest.java      |   2 +-
 .../common/util/HiveConfigurationUtilTest.java     |  26 ++--
 .../kylin/common/util/SetAndUnsetSystemProp.java}  |  24 ++--
 .../java/org/apache/kylin/cube/CubeManager.java    |   7 +-
 .../org/apache/kylin/dict/lookup/SnapshotCLI.java  |   4 +-
 .../kylin/metadata/TableMetadataManager.java       |   4 +-
 .../kylin/metadata/TempStatementManager.java       |   3 +-
 .../kylin/metadata/model/ExternalFilterDesc.java   |  21 ++-
 .../apache/kylin/metadata/model/ISourceAware.java  |   4 +
 .../org/apache/kylin/metadata/model/TableDesc.java |  13 +-
 .../kylin/metadata/project/ProjectInstance.java    |   7 +-
 .../main/java/org/apache/kylin/source/ISource.java |   8 +-
 .../org/apache/kylin/source/SourceFactory.java     |  62 ---------
 .../org/apache/kylin/source/SourceManager.java     | 154 +++++++++++++++++++++
 .../java/org/apache/kylin/engine/mr/MRUtil.java    |  10 +-
 .../kylin/engine/mr/common/JobRelatedMetaUtil.java |   4 +-
 .../kylin/engine/mr/steps/CreateDictionaryJob.java |   6 +-
 .../engine/mr/steps/FactDistinctColumnsJob.java    |   3 +-
 .../mr/steps/FactDistinctColumnsReducer.java       |   3 +-
 .../kylin/engine/mr/steps/UHCDictionaryJob.java    |   4 +-
 .../engine/mr/steps/UHCDictionaryReducer.java      |   3 +-
 .../kylin/provision/BuildCubeWithStream.java       |   4 +-
 .../org/apache/kylin/source/SourceManagerTest.java |  61 ++++++++
 .../source/hive/ITHiveSourceTableLoaderTest.java   |   4 +-
 .../kylin/source/hive/ITSnapshotManagerTest.java   |   4 +-
 .../source/jdbc/ITJdbcSourceTableLoaderTest.java   |   9 +-
 .../kylin/source/jdbc/ITJdbcTableReaderTest.java   |   5 +
 .../kylin/query/schema/OLAPSchemaFactory.java      |  25 ++--
 .../org/apache/kylin/query/util/PushDownUtil.java  |   3 +-
 .../kylin/rest/controller/TableController.java     |   8 +-
 .../apache/kylin/rest/job/StorageCleanupJob.java   |   4 +-
 .../org/apache/kylin/rest/service/AclService.java  |   3 +-
 .../org/apache/kylin/rest/service/JobService.java  |   4 +-
 .../apache/kylin/rest/service/TableService.java    |  55 ++++----
 .../org/apache/kylin/rest/util/AclUtilTest.java    |  12 ++
 .../kylin/source/hive/BeelineHiveClient.java       |  15 +-
 .../org/apache/kylin/source/hive/HiveSource.java   |  12 +-
 .../org/apache/kylin/source/jdbc/JdbcSource.java   |  11 +-
 .../org/apache/kylin/source/kafka/KafkaSource.java |  47 +++++--
 .../kylin/storage/hbase/HBaseConnection.java       |   3 +-
 .../kylin/storage/hbase/HBaseResourceStore.java    |   3 +-
 .../tool/metrics/systemcube/KylinTableCreator.java |  15 +-
 50 files changed, 587 insertions(+), 283 deletions(-)
 rename core-common/src/main/java/org/apache/kylin/common/persistence/{StorageException.java => WriteConflictException.java} (84%)
 create mode 100644 core-common/src/main/java/org/apache/kylin/common/util/HiveConfigurationUtil.java
 copy core-job/src/test/java/org/apache/kylin/job/BasicLocalMetaTest.java => core-common/src/test/java/org/apache/kylin/common/util/HiveConfigurationUtilTest.java (60%)
 copy core-common/src/{main/java/org/apache/kylin/common/util/FIFOIterable.java => test/java/org/apache/kylin/common/util/SetAndUnsetSystemProp.java} (73%)
 delete mode 100644 core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java
 create mode 100644 core-metadata/src/main/java/org/apache/kylin/source/SourceManager.java
 create mode 100644 kylin-it/src/test/java/org/apache/kylin/source/SourceManagerTest.java

-- 
To stop receiving notification emails like this one, please contact
liyang@apache.org.

[kylin] 05/08: KYLIN-3324 fix NegativeArraySizeException in CreateDictionaryJob$2.getDictionary()

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

liyang pushed a commit to branch sync
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 35009d465b9c628e2be8ab9bceff7852e20a4d42
Author: Li Yang <li...@apache.org>
AuthorDate: Thu Mar 29 06:04:16 2018 +0800

    KYLIN-3324 fix NegativeArraySizeException in CreateDictionaryJob$2.getDictionary()
---
 .../java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java  | 6 +++---
 .../org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java    | 3 ++-
 .../apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java    | 3 ++-
 .../java/org/apache/kylin/engine/mr/steps/UHCDictionaryJob.java     | 4 ++--
 .../java/org/apache/kylin/engine/mr/steps/UHCDictionaryReducer.java | 3 ++-
 5 files changed, 11 insertions(+), 8 deletions(-)

diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
index dab4880..e01da9e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
@@ -26,7 +26,7 @@ import java.util.List;
 import org.apache.commons.cli.Options;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.ArrayPrimitiveWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.util.ToolRunner;
@@ -97,10 +97,10 @@ public class CreateDictionaryJob extends AbstractHadoopJob {
 
                 try (SequenceFile.Reader reader = new SequenceFile.Reader(HadoopUtil.getCurrentConfiguration(), SequenceFile.Reader.file(dictFile))) {
                     NullWritable key = NullWritable.get();
-                    BytesWritable value = new BytesWritable();
+                    ArrayPrimitiveWritable value = new ArrayPrimitiveWritable();
                     reader.next(key, value);
 
-                    ByteBuffer buffer = new ByteArray(value.getBytes()).asBuffer();
+                    ByteBuffer buffer = new ByteArray((byte[]) value.get()).asBuffer();
                     try (DataInputStream is = new DataInputStream(new ByteBufferBackedInputStream(buffer))) {
                         String dictClassName = is.readUTF();
                         Dictionary<String> dict = (Dictionary<String>) ClassUtil.newInstance(dictClassName);
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
index cc4f260..f96944a 100755
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 
 import org.apache.commons.cli.Options;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ArrayPrimitiveWritable;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
@@ -144,7 +145,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
 
         // make each reducer output to respective dir
         MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_COLUMN, SequenceFileOutputFormat.class, NullWritable.class, Text.class);
-        MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_DICT, SequenceFileOutputFormat.class, NullWritable.class, BytesWritable.class);
+        MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_DICT, SequenceFileOutputFormat.class, NullWritable.class, ArrayPrimitiveWritable.class);
         MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_STATISTICS, SequenceFileOutputFormat.class, LongWritable.class, BytesWritable.class);
         MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_PARTITION, TextOutputFormat.class, NullWritable.class, LongWritable.class);
 
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
index cad947c..801771a 100755
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
@@ -27,6 +27,7 @@ import java.util.Map;
 
 import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.ArrayPrimitiveWritable;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
@@ -239,7 +240,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
             outputStream.writeUTF(dict.getClass().getName());
             dict.write(outputStream);
 
-            mos.write(BatchConstants.CFG_OUTPUT_DICT, NullWritable.get(), new BytesWritable(baos.toByteArray()), dictFileName);
+            mos.write(BatchConstants.CFG_OUTPUT_DICT, NullWritable.get(), new ArrayPrimitiveWritable(baos.toByteArray()), dictFileName);
         }
     }
 
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryJob.java
index 1b1a7f0..0903228 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryJob.java
@@ -24,7 +24,7 @@ import java.util.Map;
 
 import org.apache.commons.cli.Options;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.ArrayPrimitiveWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@@ -131,7 +131,7 @@ public class UHCDictionaryJob extends AbstractHadoopJob {
         job.setPartitionerClass(UHCDictionaryPartitioner.class);
         job.setNumReduceTasks(numberOfReducers);
 
-        MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_DICT, SequenceFileOutputFormat.class, NullWritable.class, BytesWritable.class);
+        MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_DICT, SequenceFileOutputFormat.class, NullWritable.class, ArrayPrimitiveWritable.class);
         FileOutputFormat.setOutputPath(job, output);
         job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, output.toString());
 
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryReducer.java
index 6da198d..d9a3549 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryReducer.java
@@ -26,6 +26,7 @@ import java.util.List;
 
 import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.ArrayPrimitiveWritable;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
@@ -106,7 +107,7 @@ public class UHCDictionaryReducer extends KylinReducer<SelfDefineSortableKey, Nu
             outputStream.writeUTF(dict.getClass().getName());
             dict.write(outputStream);
 
-            mos.write(BatchConstants.CFG_OUTPUT_DICT, NullWritable.get(), new BytesWritable(baos.toByteArray()), dictFileName);
+            mos.write(BatchConstants.CFG_OUTPUT_DICT, NullWritable.get(), new ArrayPrimitiveWritable(baos.toByteArray()), dictFileName);
         }
         mos.close();
     }

-- 
To stop receiving notification emails like this one, please contact
liyang@apache.org.

[kylin] 01/08: KYLIN-3311 refactor Resource.checkAndPutResourceImpl() to throw WriteConflictException

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

liyang pushed a commit to branch sync
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 3222de27c508ed3e7576330dc8c40206fe6cd84d
Author: Li Yang <li...@apache.org>
AuthorDate: Sun Mar 25 18:12:53 2018 +0800

    KYLIN-3311 refactor Resource.checkAndPutResourceImpl() to throw WriteConflictException
---
 .../apache/kylin/common/persistence/FileResourceStore.java   |  6 +++---
 .../apache/kylin/common/persistence/HDFSResourceStore.java   |  4 ++--
 .../org/apache/kylin/common/persistence/ResourceStore.java   | 12 ++++++++----
 .../{StorageException.java => WriteConflictException.java}   |  9 +++------
 .../apache/kylin/common/persistence/ResourceStoreTest.java   |  2 +-
 .../src/main/java/org/apache/kylin/cube/CubeManager.java     |  3 ++-
 .../java/org/apache/kylin/metadata/TempStatementManager.java |  3 ++-
 .../main/java/org/apache/kylin/rest/service/AclService.java  |  3 ++-
 .../java/org/apache/kylin/storage/hbase/HBaseConnection.java |  3 +--
 .../org/apache/kylin/storage/hbase/HBaseResourceStore.java   |  3 ++-
 10 files changed, 26 insertions(+), 22 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java
index 38ccbdd..7c6c506 100644
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java
@@ -31,13 +31,13 @@ import java.util.List;
 import java.util.NavigableSet;
 import java.util.TreeSet;
 
-import com.google.common.base.Preconditions;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
 public class FileResourceStore extends ResourceStore {
@@ -176,12 +176,12 @@ public class FileResourceStore extends ResourceStore {
 
     @Override
     protected long checkAndPutResourceImpl(String resPath, byte[] content, long oldTS, long newTS)
-            throws IOException, IllegalStateException {
+            throws IOException, WriteConflictException {
         synchronized (FileResourceStore.class) {
 
             File f = file(resPath);
             if ((f.exists() && f.lastModified() != oldTS) || (f.exists() == false && oldTS != 0))
-                throw new IllegalStateException("Overwriting conflict " + resPath + ", expect old TS " + oldTS
+                throw new WriteConflictException("Overwriting conflict " + resPath + ", expect old TS " + oldTS
                         + ", but found " + f.lastModified());
 
             putResourceImpl(resPath, new ByteArrayInputStream(content), newTS);
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/HDFSResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/HDFSResourceStore.java
index 8ad2540..1739ce0 100644
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/HDFSResourceStore.java
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/HDFSResourceStore.java
@@ -207,7 +207,7 @@ public class HDFSResourceStore extends ResourceStore {
     }
 
     @Override
-    protected long checkAndPutResourceImpl(String resPath, byte[] content, long oldTS, long newTS) throws IOException, IllegalStateException {
+    protected long checkAndPutResourceImpl(String resPath, byte[] content, long oldTS, long newTS) throws IOException, WriteConflictException {
         Path p = getRealHDFSPath(resPath);
         if (!fs.exists(p)) {
             if (oldTS != 0) {
@@ -217,7 +217,7 @@ public class HDFSResourceStore extends ResourceStore {
         } else {
             long realLastModify = getResourceTimestamp(resPath);
             if (realLastModify != oldTS) {
-                throw new IllegalStateException("Overwriting conflict " + resPath + ", expect old TS " + oldTS + ", but found " + realLastModify);
+                throw new WriteConflictException("Overwriting conflict " + resPath + ", expect old TS " + oldTS + ", but found " + realLastModify);
             }
         }
         putResourceImpl(resPath, new ByteArrayInputStream(content), newTS);
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
index 2bccd67..bda6cd0 100644
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
@@ -276,14 +276,16 @@ abstract public class ResourceStore {
     /**
      * check & set, overwrite a resource
      */
-    final public <T extends RootPersistentEntity> long putResource(String resPath, T obj, Serializer<T> serializer) throws IOException {
+    final public <T extends RootPersistentEntity> long putResource(String resPath, T obj, Serializer<T> serializer)
+            throws IOException, WriteConflictException {
         return putResource(resPath, obj, System.currentTimeMillis(), serializer);
     }
 
     /**
      * check & set, overwrite a resource
      */
-    final public <T extends RootPersistentEntity> long putResource(String resPath, T obj, long newTS, Serializer<T> serializer) throws IOException {
+    final public <T extends RootPersistentEntity> long putResource(String resPath, T obj, long newTS,
+            Serializer<T> serializer) throws IOException, WriteConflictException {
         resPath = norm(resPath);
         //logger.debug("Saving resource " + resPath + " (Store " + kylinConfig.getMetadataUrl() + ")");
 
@@ -309,7 +311,8 @@ abstract public class ResourceStore {
         }
     }
 
-    private long checkAndPutResourceCheckpoint(String resPath, byte[] content, long oldTS, long newTS) throws IOException {
+    private long checkAndPutResourceCheckpoint(String resPath, byte[] content, long oldTS, long newTS)
+            throws IOException, WriteConflictException {
         beforeChange(resPath);
         return checkAndPutResourceImpl(resPath, content, oldTS, newTS);
     }
@@ -317,7 +320,8 @@ abstract public class ResourceStore {
     /**
      * checks old timestamp when overwriting existing
      */
-    abstract protected long checkAndPutResourceImpl(String resPath, byte[] content, long oldTS, long newTS) throws IOException, IllegalStateException;
+    abstract protected long checkAndPutResourceImpl(String resPath, byte[] content, long oldTS, long newTS)
+            throws IOException, WriteConflictException;
 
     /**
      * delete a resource, does nothing on a folder
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/StorageException.java b/core-common/src/main/java/org/apache/kylin/common/persistence/WriteConflictException.java
similarity index 84%
rename from core-common/src/main/java/org/apache/kylin/common/persistence/StorageException.java
rename to core-common/src/main/java/org/apache/kylin/common/persistence/WriteConflictException.java
index 604941a..f235b05 100644
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/StorageException.java
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/WriteConflictException.java
@@ -19,19 +19,16 @@
 package org.apache.kylin.common.persistence;
 
 /**
- * 
- * @author xjiang
- * 
  */
-public class StorageException extends RuntimeException {
+public class WriteConflictException extends RuntimeException {
 
     private static final long serialVersionUID = -3748712888242406257L;
 
-    public StorageException(String msg, Throwable t) {
+    public WriteConflictException(String msg, Throwable t) {
         super(msg, t);
     }
 
-    public StorageException(String msg) {
+    public WriteConflictException(String msg) {
         super(msg);
     }
 
diff --git a/core-common/src/test/java/org/apache/kylin/common/persistence/ResourceStoreTest.java b/core-common/src/test/java/org/apache/kylin/common/persistence/ResourceStoreTest.java
index f183e7c..3e1a31c 100644
--- a/core-common/src/test/java/org/apache/kylin/common/persistence/ResourceStoreTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/persistence/ResourceStoreTest.java
@@ -137,7 +137,7 @@ public class ResourceStoreTest {
             t.setLastModified(t.getLastModified() - 1);
             store.putResource(path2, t, StringEntity.serializer);
             fail("write conflict should trigger IllegalStateException");
-        } catch (IllegalStateException e) {
+        } catch (WriteConflictException e) {
             // expected
         }
 
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 15bb676..b8deadb 100755
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -37,6 +37,7 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.JsonSerializer;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.Serializer;
+import org.apache.kylin.common.persistence.WriteConflictException;
 import org.apache.kylin.common.util.AutoReadWriteLock;
 import org.apache.kylin.common.util.AutoReadWriteLock.AutoLock;
 import org.apache.kylin.common.util.Dictionary;
@@ -354,7 +355,7 @@ public class CubeManager implements IRealizationProvider {
 
         try {
             cube = crud.save(cube);
-        } catch (IllegalStateException ise) {
+        } catch (WriteConflictException ise) {
             logger.warn("Write conflict to update cube " + cube.getName() + " at try " + retry + ", will retry...");
             if (retry >= 7) {
                 logger.error("Retried 7 times till got error, abandoning...", ise);
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/TempStatementManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/TempStatementManager.java
index 970df0c..182fc47 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/TempStatementManager.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/TempStatementManager.java
@@ -25,6 +25,7 @@ import java.util.List;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.RootPersistentEntity;
+import org.apache.kylin.common.persistence.WriteConflictException;
 import org.apache.kylin.common.util.AutoReadWriteLock;
 import org.apache.kylin.common.util.AutoReadWriteLock.AutoLock;
 import org.apache.kylin.metadata.cachesync.Broadcaster;
@@ -136,7 +137,7 @@ public class TempStatementManager {
     private void updateTempStatementWithRetry(TempStatementEntity entity, int retry) throws IOException {
         try {
             crud.save(entity);
-        } catch (IllegalStateException ise) {
+        } catch (WriteConflictException ise) {
             logger.warn("Write conflict to update temp statement" + entity.statementId + " at try " + retry
                     + ", will retry...");
             if (retry >= 7) {
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/AclService.java b/server-base/src/main/java/org/apache/kylin/rest/service/AclService.java
index f3e2393..4f439fe 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/AclService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/AclService.java
@@ -32,6 +32,7 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.JsonSerializer;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.Serializer;
+import org.apache.kylin.common.persistence.WriteConflictException;
 import org.apache.kylin.common.util.AutoReadWriteLock;
 import org.apache.kylin.common.util.AutoReadWriteLock.AutoLock;
 import org.apache.kylin.metadata.cachesync.Broadcaster;
@@ -305,7 +306,7 @@ public class AclService implements MutableAclService, InitializingBean {
                 crud.save(record);
                 return acl; // here we are done
 
-            } catch (IllegalStateException ise) {
+            } catch (WriteConflictException ise) {
                 if (retry <= 0) {
                     logger.error("Retry is out, till got error, abandoning...", ise);
                     throw ise;
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
index 0c2fb04..53e8a68 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
@@ -49,7 +49,6 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.StorageURL;
 import org.apache.kylin.common.lock.DistributedLock;
-import org.apache.kylin.common.persistence.StorageException;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -269,7 +268,7 @@ public class HBaseConnection {
 
         } catch (Throwable t) {
             logger.error("Error when open connection " + url, t);
-            throw new StorageException("Error when open connection " + url, t);
+            throw new RuntimeException("Error when open connection " + url, t);
         }
 
         return connection;
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
index 3762437..23df556 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
@@ -54,6 +54,7 @@ import org.apache.kylin.common.StorageURL;
 import org.apache.kylin.common.persistence.RawResource;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.StringEntity;
+import org.apache.kylin.common.persistence.WriteConflictException;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.common.util.HadoopUtil;
@@ -321,7 +322,7 @@ public class HBaseResourceStore extends ResourceStore {
                     + ", operation result: " + ok);
             if (!ok) {
                 long real = getResourceTimestampImpl(resPath);
-                throw new IllegalStateException(
+                throw new WriteConflictException(
                         "Overwriting conflict " + resPath + ", expect old TS " + oldTS + ", but it is " + real);
             }
 

-- 
To stop receiving notification emails like this one, please contact
liyang@apache.org.

[kylin] 04/08: KYLIN-3277 Code review

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

liyang pushed a commit to branch sync
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 2e58857d8d6903e5a0e5dc92efdaf93e7fe1f479
Author: nichunen <ch...@kyligence.io>
AuthorDate: Tue Mar 27 22:40:19 2018 +0800

    KYLIN-3277 Code review
---
 .../apache/kylin/common/util/HiveCmdBuilder.java   |  7 ----
 .../kylin/common/util/HiveConfigurationUtil.java   | 19 +++++-----
 .../common/util/HiveConfigurationUtilTest.java     | 40 ++++++++++++++++++++++
 .../kylin/source/hive/BeelineHiveClient.java       |  3 +-
 4 files changed, 50 insertions(+), 19 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/util/HiveCmdBuilder.java b/core-common/src/main/java/org/apache/kylin/common/util/HiveCmdBuilder.java
index 9a9c65b..1823dfc 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/HiveCmdBuilder.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/HiveCmdBuilder.java
@@ -18,20 +18,13 @@
 
 package org.apache.kylin.common.util;
 
-import java.io.File;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.Map;
 
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-
 import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.w3c.dom.Document;
-import org.w3c.dom.NodeList;
 
 import com.google.common.collect.Lists;
 
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/HiveConfigurationUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/HiveConfigurationUtil.java
index a8cba61..1c6f985 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/HiveConfigurationUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/HiveConfigurationUtil.java
@@ -18,20 +18,21 @@
 
 package org.apache.kylin.common.util;
 
-import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.slf4j.LoggerFactory;
-import org.w3c.dom.Document;
-import org.w3c.dom.NodeList;
+import static org.apache.kylin.common.util.HiveCmdBuilder.HIVE_CONF_FILENAME;
 
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
 import java.io.File;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 
-import static org.apache.kylin.common.util.HiveCmdBuilder.HIVE_CONF_FILENAME;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Document;
+import org.w3c.dom.NodeList;
 
 /**
  * @author ycq
@@ -97,5 +98,3 @@ public class HiveConfigurationUtil {
     }
 
 }
-
-
diff --git a/core-common/src/test/java/org/apache/kylin/common/util/HiveConfigurationUtilTest.java b/core-common/src/test/java/org/apache/kylin/common/util/HiveConfigurationUtilTest.java
new file mode 100644
index 0000000..d4019a9
--- /dev/null
+++ b/core-common/src/test/java/org/apache/kylin/common/util/HiveConfigurationUtilTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.util;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.Properties;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class HiveConfigurationUtilTest {
+    @Before
+    public void setup() {
+        System.setProperty("log4j.configuration", "file:../build/conf/kylin-tools-log4j.properties");
+        System.setProperty("KYLIN_CONF", LocalFileMetadataTestCase.LOCALMETA_TEST_DATA);
+    }
+
+    @Test
+    public void testHiveConf() {
+        Properties properties = HiveConfigurationUtil.loadHiveJDBCProperties();
+        assertTrue(properties.containsKey("hiveconf:hive.auto.convert.join.noconditionaltask.size"));
+    }
+}
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java b/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java
index 593ad96..52b752c 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java
@@ -30,10 +30,10 @@ import java.util.Properties;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.util.DBUtils;
+import org.apache.kylin.common.util.HiveConfigurationUtil;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
-import org.apache.kylin.common.util.HiveConfigurationUtil;
 
 public class BeelineHiveClient implements IHiveClient {
 
@@ -43,7 +43,6 @@ public class BeelineHiveClient implements IHiveClient {
     private Statement stmt;
     private DatabaseMetaData metaData;
 
-
     public BeelineHiveClient(String beelineParams) {
         if (StringUtils.isEmpty(beelineParams)) {
             throw new IllegalArgumentException("BeelineParames cannot be empty");

-- 
To stop receiving notification emails like this one, please contact
liyang@apache.org.

[kylin] 03/08: KYLIN-3277 Kylin should override hiveconf settings when connecting to hive using jdbc

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

liyang pushed a commit to branch sync
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 25b4b3bc06b30a4030f4949d97a05566bcbddca4
Author: ycq <yc...@meitu.com>
AuthorDate: Wed Mar 7 17:10:38 2018 +0800

    KYLIN-3277 Kylin should override hiveconf settings when connecting to hive using jdbc
    
    Signed-off-by: nichunen <ch...@kyligence.io>
---
 .../apache/kylin/common/util/HiveCmdBuilder.java   |  47 +---------
 .../kylin/common/util/HiveConfigurationUtil.java   | 101 +++++++++++++++++++++
 .../kylin/source/hive/BeelineHiveClient.java       |  16 +++-
 3 files changed, 115 insertions(+), 49 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/util/HiveCmdBuilder.java b/core-common/src/main/java/org/apache/kylin/common/util/HiveCmdBuilder.java
index fcfc598..9a9c65b 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/HiveCmdBuilder.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/HiveCmdBuilder.java
@@ -46,12 +46,12 @@ public class HiveCmdBuilder {
     }
 
     private KylinConfig kylinConfig;
-    final private Map<String, String> hiveConfProps = new HashMap<>();
+    final private Map<String, String> hiveConfProps;
     final private ArrayList<String> statements = Lists.newArrayList();
 
     public HiveCmdBuilder() {
         kylinConfig = KylinConfig.getInstanceFromEnv();
-        loadHiveConfiguration();
+        hiveConfProps = HiveConfigurationUtil.loadHiveConfiguration();
     }
 
     public String build() {
@@ -153,47 +153,4 @@ public class HiveCmdBuilder {
         return build();
     }
 
-    private void loadHiveConfiguration() {
-
-        File hiveConfFile;
-        String hiveConfFileName = (HIVE_CONF_FILENAME + ".xml");
-        String path = System.getProperty(KylinConfig.KYLIN_CONF);
-
-        if (StringUtils.isNotEmpty(path)) {
-            hiveConfFile = new File(path, hiveConfFileName);
-        } else {
-            path = KylinConfig.getKylinHome();
-            if (StringUtils.isEmpty(path)) {
-                logger.error("KYLIN_HOME is not set, can not locate hive conf: {}.xml", HIVE_CONF_FILENAME);
-                return;
-            }
-            hiveConfFile = new File(path + File.separator + "conf", hiveConfFileName);
-        }
-
-        if (!hiveConfFile.exists()) {
-            throw new RuntimeException("Missing config file: " + hiveConfFile.getAbsolutePath());
-        }
-
-        String fileUrl = OptionsHelper.convertToFileURL(hiveConfFile.getAbsolutePath());
-
-        try {
-            File file = new File(fileUrl);
-            if (file.exists()) {
-                DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
-                DocumentBuilder builder = factory.newDocumentBuilder();
-                Document doc = builder.parse(file);
-                NodeList nl = doc.getElementsByTagName("property");
-                hiveConfProps.clear();
-                for (int i = 0; i < nl.getLength(); i++) {
-                    String key = doc.getElementsByTagName("name").item(i).getFirstChild().getNodeValue();
-                    String value = doc.getElementsByTagName("value").item(i).getFirstChild().getNodeValue();
-                    if (!key.equals("tmpjars")) {
-                        hiveConfProps.put(key, value);
-                    }
-                }
-            }
-        } catch (Exception e) {
-            throw new RuntimeException("Failed to parse hive conf file ", e);
-        }
-    }
 }
\ No newline at end of file
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/HiveConfigurationUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/HiveConfigurationUtil.java
new file mode 100644
index 0000000..a8cba61
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/util/HiveConfigurationUtil.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.util;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Document;
+import org.w3c.dom.NodeList;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.kylin.common.util.HiveCmdBuilder.HIVE_CONF_FILENAME;
+
+/**
+ * @author ycq
+ * @since 2018-03-05
+ */
+public class HiveConfigurationUtil {
+
+    private static final org.slf4j.Logger logger = LoggerFactory.getLogger(HiveConfigurationUtil.class);
+    private static final String HIVE_CONF_PREFIX = "hiveconf:";
+
+    public static Properties loadHiveJDBCProperties() {
+        Map<String, String> hiveConfiguration = loadHiveConfiguration();
+        Properties ret = new Properties();
+        for (Map.Entry<String, String> entry : hiveConfiguration.entrySet()) {
+            ret.put(HIVE_CONF_PREFIX + entry.getKey(), entry.getValue());
+        }
+        return ret;
+    }
+
+    public static Map<String, String> loadHiveConfiguration() {
+        Map<String, String> hiveConfProps = new HashMap<>();
+        File hiveConfFile;
+        String hiveConfFileName = (HIVE_CONF_FILENAME + ".xml");
+        String path = System.getProperty(KylinConfig.KYLIN_CONF);
+
+        if (StringUtils.isNotEmpty(path)) {
+            hiveConfFile = new File(path, hiveConfFileName);
+        } else {
+            path = KylinConfig.getKylinHome();
+            if (StringUtils.isEmpty(path)) {
+                logger.error("KYLIN_HOME is not set, can not locate hive conf: {}.xml", HIVE_CONF_FILENAME);
+                return hiveConfProps;
+            }
+            hiveConfFile = new File(path + File.separator + "conf", hiveConfFileName);
+        }
+
+        if (!hiveConfFile.exists()) {
+            throw new RuntimeException("Failed to read " + HIVE_CONF_FILENAME + ".xml");
+        }
+
+        String fileUrl = OptionsHelper.convertToFileURL(hiveConfFile.getAbsolutePath());
+
+        try {
+            File file = new File(fileUrl);
+            if (file.exists()) {
+                DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
+                DocumentBuilder builder = factory.newDocumentBuilder();
+                Document doc = builder.parse(file);
+                NodeList nl = doc.getElementsByTagName("property");
+                hiveConfProps.clear();
+                for (int i = 0; i < nl.getLength(); i++) {
+                    String key = doc.getElementsByTagName("name").item(i).getFirstChild().getNodeValue();
+                    String value = doc.getElementsByTagName("value").item(i).getFirstChild().getNodeValue();
+                    if (!key.equals("tmpjars")) {
+                        hiveConfProps.put(key, value);
+                    }
+                }
+            }
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to parse hive conf file ", e);
+        }
+        return hiveConfProps;
+    }
+
+}
+
+
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java b/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java
index 747b1bb..593ad96 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java
@@ -26,25 +26,30 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.List;
+import java.util.Properties;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.util.DBUtils;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import org.apache.kylin.common.util.HiveConfigurationUtil;
 
 public class BeelineHiveClient implements IHiveClient {
 
+    private static final String HIVE_AUTH_USER = "user";
+    private static final String HIVE_AUTH_PASSWD = "password";
     private Connection cnct;
     private Statement stmt;
     private DatabaseMetaData metaData;
 
+
     public BeelineHiveClient(String beelineParams) {
         if (StringUtils.isEmpty(beelineParams)) {
             throw new IllegalArgumentException("BeelineParames cannot be empty");
         }
         String[] splits = StringUtils.split(beelineParams);
-        String url = null, username = null, password = null;
+        String url = "", username = "", password = "";
         for (int i = 0; i < splits.length; i++) {
             if ("-u".equals(splits[i])) {
                 url = stripQuotes(splits[i + 1]);
@@ -56,13 +61,16 @@ public class BeelineHiveClient implements IHiveClient {
                 password = stripQuotes(splits[i + 1]);
             }
         }
-        this.init(url, username, password);
+        Properties jdbcProperties = HiveConfigurationUtil.loadHiveJDBCProperties();
+        jdbcProperties.put(HIVE_AUTH_PASSWD, password);
+        jdbcProperties.put(HIVE_AUTH_USER, username);
+        this.init(url, jdbcProperties);
     }
 
-    private void init(String url, String username, String password) {
+    private void init(String url, Properties hiveProperties) {
         try {
             Class.forName("org.apache.hive.jdbc.HiveDriver");
-            cnct = DriverManager.getConnection(url, username, password);
+            cnct = DriverManager.getConnection(url, hiveProperties);
             stmt = cnct.createStatement();
             metaData = cnct.getMetaData();
         } catch (SQLException | ClassNotFoundException e) {

-- 
To stop receiving notification emails like this one, please contact
liyang@apache.org.

[kylin] 06/08: minor, add test util SetAndUnsetSystemProp

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

liyang pushed a commit to branch sync
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 4d4915bff491db630166a5841290d18c520ca484
Author: Li Yang <li...@apache.org>
AuthorDate: Sun Apr 1 16:34:05 2018 +0800

    minor, add test util SetAndUnsetSystemProp
---
 .../kylin/common/util/SetAndUnsetSystemProp.java   | 34 ++++++++++++++++++++++
 1 file changed, 34 insertions(+)

diff --git a/core-common/src/test/java/org/apache/kylin/common/util/SetAndUnsetSystemProp.java b/core-common/src/test/java/org/apache/kylin/common/util/SetAndUnsetSystemProp.java
new file mode 100644
index 0000000..83d0656
--- /dev/null
+++ b/core-common/src/test/java/org/apache/kylin/common/util/SetAndUnsetSystemProp.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.util;
+
+public class SetAndUnsetSystemProp implements AutoCloseable {
+    
+    final private String propName;
+    
+    public SetAndUnsetSystemProp(String propName, String value) {
+        this.propName = propName;
+        System.setProperty(propName, value);
+    }
+
+    @Override
+    public void close() {
+        System.clearProperty(propName);
+    }
+}

-- 
To stop receiving notification emails like this one, please contact
liyang@apache.org.

[kylin] 08/08: KYLIN-3315 allow each project to set its own source

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

liyang pushed a commit to branch sync
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit f48984b58067f00bfdf3ccf5c15bcfbe63c30775
Author: lidongsjtu <li...@apache.org>
AuthorDate: Thu Mar 29 14:00:39 2018 +0800

    KYLIN-3315 allow each project to set its own source
---
 core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 0726897..f558f42 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -829,7 +829,7 @@ abstract public class KylinConfigBase implements Serializable {
     }
 
     public String getJdbcSourceDialect() {
-        return getOptional("kylin.source.jdbc.dialect");
+        return getOptional("kylin.source.jdbc.dialect", "default");
     }
 
     public String getJdbcSourceUser() {

-- 
To stop receiving notification emails like this one, please contact
liyang@apache.org.

[kylin] 02/08: minor, add null case.

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

liyang pushed a commit to branch sync
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 190f466b109f1e229e4a6c2f725469511c66bbcd
Author: tttMelody <24...@qq.com>
AuthorDate: Mon Mar 26 13:06:32 2018 +0800

    minor, add null case.
---
 .../test/java/org/apache/kylin/rest/util/AclUtilTest.java    | 12 ++++++++++++
 1 file changed, 12 insertions(+)

diff --git a/server/src/test/java/org/apache/kylin/rest/util/AclUtilTest.java b/server/src/test/java/org/apache/kylin/rest/util/AclUtilTest.java
index 18e5bf5..217abe8 100644
--- a/server/src/test/java/org/apache/kylin/rest/util/AclUtilTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/util/AclUtilTest.java
@@ -42,6 +42,18 @@ public class AclUtilTest extends ServiceTestBase {
     AclUtil aclUtil;
 
     @Test
+    public void testNull() {
+        // ADMIN will go into hasRole first.
+        swichUser("ANALYST", Constant.ROLE_ANALYST);
+        try {
+            aclUtil.hasProjectAdminPermission(null);
+            Assert.fail("expecting some AlreadyExistsException here");
+        } catch (Exception e) {
+            Assert.assertEquals("Access is denied", e.getMessage());
+        }
+    }
+
+    @Test
     public void testBasic() throws IOException {
         final String PROJECT = "default";
         final String ANALYST = "ANALYST";

-- 
To stop receiving notification emails like this one, please contact
liyang@apache.org.

[kylin] 07/08: KYLIN-3315 allow each project to set its own source

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

liyang pushed a commit to branch sync
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 54bdae63cfca137c02cdf8749f93d152a9ac2010
Author: lidongsjtu <li...@apache.org>
AuthorDate: Tue Apr 3 11:11:36 2018 +0800

    KYLIN-3315 allow each project to set its own source
---
 .../test/java/org/apache/kylin/job/DeployUtil.java |   4 +-
 .../java/org/apache/kylin/cube/CubeManager.java    |   4 +-
 .../org/apache/kylin/dict/lookup/SnapshotCLI.java  |   4 +-
 .../kylin/metadata/TableMetadataManager.java       |   4 +-
 .../kylin/metadata/model/ExternalFilterDesc.java   |  21 ++-
 .../apache/kylin/metadata/model/ISourceAware.java  |   4 +
 .../org/apache/kylin/metadata/model/TableDesc.java |  13 +-
 .../kylin/metadata/project/ProjectInstance.java    |   7 +-
 .../main/java/org/apache/kylin/source/ISource.java |   8 +-
 .../org/apache/kylin/source/SourceFactory.java     |  62 ---------
 .../org/apache/kylin/source/SourceManager.java     | 154 +++++++++++++++++++++
 .../java/org/apache/kylin/engine/mr/MRUtil.java    |  10 +-
 .../kylin/engine/mr/common/JobRelatedMetaUtil.java |   4 +-
 .../kylin/provision/BuildCubeWithStream.java       |   4 +-
 .../org/apache/kylin/source/SourceManagerTest.java |  61 ++++++++
 .../source/hive/ITHiveSourceTableLoaderTest.java   |   4 +-
 .../kylin/source/hive/ITSnapshotManagerTest.java   |   4 +-
 .../source/jdbc/ITJdbcSourceTableLoaderTest.java   |   9 +-
 .../kylin/source/jdbc/ITJdbcTableReaderTest.java   |   5 +
 .../kylin/query/schema/OLAPSchemaFactory.java      |  25 ++--
 .../org/apache/kylin/query/util/PushDownUtil.java  |   3 +-
 .../kylin/rest/controller/TableController.java     |   8 +-
 .../apache/kylin/rest/job/StorageCleanupJob.java   |   4 +-
 .../org/apache/kylin/rest/service/JobService.java  |   4 +-
 .../apache/kylin/rest/service/TableService.java    |  55 ++++----
 .../org/apache/kylin/source/hive/HiveSource.java   |  12 +-
 .../org/apache/kylin/source/jdbc/JdbcSource.java   |  11 +-
 .../org/apache/kylin/source/kafka/KafkaSource.java |  47 +++++--
 .../tool/metrics/systemcube/KylinTableCreator.java |  15 +-
 29 files changed, 401 insertions(+), 169 deletions(-)

diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
index a418dc9..524c2e4 100644
--- a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
+++ b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
@@ -47,7 +47,7 @@ import org.apache.kylin.metadata.model.DataModelManager;
 import org.apache.kylin.metadata.model.TableRef;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.source.ISampleDataDeployer;
-import org.apache.kylin.source.SourceFactory;
+import org.apache.kylin.source.SourceManager;
 import org.apache.kylin.source.datagen.ModelDataGenerator;
 import org.apache.kylin.source.kafka.TimedJsonStreamParser;
 import org.apache.maven.model.Model;
@@ -231,7 +231,7 @@ public class DeployUtil {
         }
         tempDir.deleteOnExit();
 
-        ISampleDataDeployer sampleDataDeployer = SourceFactory.getSource(model.getRootFactTable().getTableDesc())
+        ISampleDataDeployer sampleDataDeployer = SourceManager.getSource(model.getRootFactTable().getTableDesc())
                 .getSampleDataDeployer();
         
         // create hive tables
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index b8deadb..fc2ad3d 100755
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -69,7 +69,7 @@ import org.apache.kylin.metadata.realization.IRealizationProvider;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
 import org.apache.kylin.metadata.realization.RealizationType;
 import org.apache.kylin.source.IReadableTable;
-import org.apache.kylin.source.SourceFactory;
+import org.apache.kylin.source.SourceManager;
 import org.apache.kylin.source.SourcePartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -1052,7 +1052,7 @@ public class CubeManager implements IRealizationProvider {
             SnapshotManager snapshotMgr = getSnapshotManager();
 
             TableDesc tableDesc = new TableDesc(metaMgr.getTableDesc(lookupTable, segCopy.getProject()));
-            IReadableTable hiveTable = SourceFactory.createReadableTable(tableDesc);
+            IReadableTable hiveTable = SourceManager.createReadableTable(tableDesc);
             SnapshotTable snapshot = snapshotMgr.buildSnapshot(hiveTable, tableDesc);
 
             segCopy.putSnapshotResPath(lookupTable, snapshot.getResourcePath());
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotCLI.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotCLI.java
index 2093d23..f965d18 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotCLI.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotCLI.java
@@ -23,7 +23,7 @@ import java.io.IOException;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.metadata.TableMetadataManager;
 import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.source.SourceFactory;
+import org.apache.kylin.source.SourceManager;
 
 public class SnapshotCLI {
 
@@ -42,7 +42,7 @@ public class SnapshotCLI {
         if (tableDesc == null)
             throw new IllegalArgumentException("Not table found by " + table);
 
-        SnapshotTable snapshot = snapshotMgr.rebuildSnapshot(SourceFactory.createReadableTable(tableDesc), tableDesc, overwriteUUID);
+        SnapshotTable snapshot = snapshotMgr.rebuildSnapshot(SourceManager.createReadableTable(tableDesc), tableDesc, overwriteUUID);
         System.out.println("resource path updated: " + snapshot.getResourcePath());
     }
 }
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/TableMetadataManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/TableMetadataManager.java
index 42233b7..116e210 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/TableMetadataManager.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/TableMetadataManager.java
@@ -118,7 +118,7 @@ public class TableMetadataManager {
             @Override
             protected TableDesc initEntityAfterReload(TableDesc t, String resourceName) {
                 String prj = TableDesc.parseResourcePath(resourceName).getSecond();
-                t.init(prj);
+                t.init(config, prj);
                 return t;
             }
         };
@@ -237,7 +237,7 @@ public class TableMetadataManager {
 
     public void saveSourceTable(TableDesc srcTable, String prj) throws IOException {
         try (AutoLock lock = srcTableMapLock.lockForWrite()) {
-            srcTable.init(prj);
+            srcTable.init(config, prj);
             srcTableCrud.save(srcTable);
         }
     }
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ExternalFilterDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ExternalFilterDesc.java
index 35018c7..7ef84aa 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ExternalFilterDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ExternalFilterDesc.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.metadata.model;
 
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.RootPersistentEntity;
 import org.apache.kylin.metadata.filter.function.Functions;
@@ -62,7 +63,7 @@ public class ExternalFilterDesc extends RootPersistentEntity implements ISourceA
     public String resourceName() {
         return name;
     }
-    
+
     public String getFilterResourceIdentifier() {
         return filterResourceIdentifier;
     }
@@ -94,7 +95,8 @@ public class ExternalFilterDesc extends RootPersistentEntity implements ISourceA
 
     @Override
     public String toString() {
-        return "ExternalFilterDesc [ name=" + name + " filter table resource identifier " + this.filterResourceIdentifier + "]";
+        return "ExternalFilterDesc [ name=" + name + " filter table resource identifier "
+                + this.filterResourceIdentifier + "]";
     }
 
     /** create a mockup table for unit test */
@@ -104,11 +106,6 @@ public class ExternalFilterDesc extends RootPersistentEntity implements ISourceA
         return mockup;
     }
 
-    @Override
-    public int getSourceType() {
-        return sourceType;
-    }
-
     public void setSourceType(int sourceType) {
         this.sourceType = sourceType;
     }
@@ -120,4 +117,14 @@ public class ExternalFilterDesc extends RootPersistentEntity implements ISourceA
     public void setDescription(String description) {
         this.description = description;
     }
+
+    @Override
+    public int getSourceType() {
+        return sourceType;
+    }
+
+    @Override
+    public KylinConfig getConfig() {
+        return null;
+    }
 }
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java
index 7ab1bca..eab3e2c 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java
@@ -18,6 +18,8 @@
 
 package org.apache.kylin.metadata.model;
 
+import org.apache.kylin.common.KylinConfig;
+
 public interface ISourceAware {
 
     public static final int ID_HIVE = 0;
@@ -27,4 +29,6 @@ public interface ISourceAware {
     public static final int ID_JDBC = 8;
 
     int getSourceType();
+
+    KylinConfig getConfig();
 }
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
index be278de..a9e9877 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
@@ -22,6 +22,7 @@ import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
 
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.RootPersistentEntity;
 import org.apache.kylin.common.util.Pair;
@@ -98,6 +99,7 @@ public class TableDesc extends RootPersistentEntity implements ISourceAware {
     private String dataGen;
 
     private String project;
+    private KylinConfig config;
     private DatabaseDesc database = new DatabaseDesc();
     private String identity = null;
     private boolean isBorrowedFromGlobal = false;
@@ -121,6 +123,7 @@ public class TableDesc extends RootPersistentEntity implements ISourceAware {
         }
 
         this.project = other.project;
+        this.config = other.config;
         this.database.setName(other.getDatabase());
         this.identity = other.identity;
     }
@@ -287,9 +290,10 @@ public class TableDesc extends RootPersistentEntity implements ISourceAware {
         return dataGen;
     }
 
-    public void init(String project) {
+    public void init(KylinConfig config, String project) {
         this.project = project;
-
+        this.config = config;
+        
         if (name != null)
             name = name.toUpperCase();
 
@@ -372,6 +376,11 @@ public class TableDesc extends RootPersistentEntity implements ISourceAware {
         return sourceType;
     }
 
+    @Override
+    public KylinConfig getConfig() {
+        return config;
+    }
+
     public void setSourceType(int sourceType) {
         this.sourceType = sourceType;
     }
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
index 9b7aaf2..45622f3 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
@@ -31,6 +31,7 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.KylinConfigExt;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.RootPersistentEntity;
+import org.apache.kylin.metadata.model.ISourceAware;
 import org.apache.kylin.metadata.realization.RealizationType;
 
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
@@ -47,7 +48,7 @@ import com.google.common.collect.Lists;
  */
 @SuppressWarnings("serial")
 @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
-public class ProjectInstance extends RootPersistentEntity {
+public class ProjectInstance extends RootPersistentEntity implements ISourceAware {
 
     public static final String DEFAULT_PROJECT_NAME = "default";
 
@@ -338,4 +339,8 @@ public class ProjectInstance extends RootPersistentEntity {
         return "ProjectDesc [name=" + name + "]";
     }
 
+    @Override
+    public int getSourceType() {
+        return getConfig().getDefaultSource();
+    }
 }
diff --git a/core-metadata/src/main/java/org/apache/kylin/source/ISource.java b/core-metadata/src/main/java/org/apache/kylin/source/ISource.java
index 42548ae..2c5a922 100644
--- a/core-metadata/src/main/java/org/apache/kylin/source/ISource.java
+++ b/core-metadata/src/main/java/org/apache/kylin/source/ISource.java
@@ -18,13 +18,15 @@
 
 package org.apache.kylin.source;
 
+import java.io.Closeable;
+
 import org.apache.kylin.metadata.model.IBuildable;
 import org.apache.kylin.metadata.model.TableDesc;
 
 /**
  * Represents a kind of source to Kylin, like Hive.
  */
-public interface ISource {
+public interface ISource extends Closeable {
 
     /** 
      * Return an explorer to sync table metadata from the data source.
@@ -41,13 +43,13 @@ public interface ISource {
      * Return a ReadableTable that can iterate through the rows of given table.
      */
     IReadableTable createReadableTable(TableDesc tableDesc);
-    
+
     /**
      * Give the source a chance to enrich a SourcePartition before build start.
      * Particularly, Kafka source use this chance to define start/end offsets within each partition.
      */
     SourcePartition enrichSourcePartitionBeforeBuild(IBuildable buildable, SourcePartition srcPartition);
-    
+
     /**
      * Return an object that is responsible for deploying sample (CSV) data to the source database.
      * For testing purpose.
diff --git a/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java b/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java
deleted file mode 100644
index 365b505..0000000
--- a/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.source;
-
-import java.util.List;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.ImplementationSwitch;
-import org.apache.kylin.metadata.model.ISourceAware;
-import org.apache.kylin.metadata.model.TableDesc;
-
-public class SourceFactory {
-
-    // Use thread-local because KylinConfig can be thread-local and implementation might be different among multiple threads.
-    private static ThreadLocal<ImplementationSwitch<ISource>> sources = new ThreadLocal<>();
-
-    private static ISource getSource(int sourceType) {
-        ImplementationSwitch<ISource> current = sources.get();
-        if (current == null) {
-            current = new ImplementationSwitch<>(KylinConfig.getInstanceFromEnv().getSourceEngines(), ISource.class);
-            sources.set(current);
-        }
-        return current.get(sourceType);
-    }
-
-    public static ISource getDefaultSource() {
-        return getSource(KylinConfig.getInstanceFromEnv().getDefaultSource());
-    }
-
-    public static ISource getSource(ISourceAware aware) {
-        return getSource(aware.getSourceType());
-    }
-
-    public static IReadableTable createReadableTable(TableDesc table) {
-        return getSource(table).createReadableTable(table);
-    }
-
-    public static <T> T createEngineAdapter(ISourceAware table, Class<T> engineInterface) {
-        return getSource(table).adaptToBuildEngine(engineInterface);
-    }
-
-    public static List<String> getMRDependentResources(TableDesc table) {
-        return getSource(table).getSourceMetadataExplorer().getRelatedKylinResources(table);
-    }
-
-}
diff --git a/core-metadata/src/main/java/org/apache/kylin/source/SourceManager.java b/core-metadata/src/main/java/org/apache/kylin/source/SourceManager.java
new file mode 100644
index 0000000..62c4368
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/source/SourceManager.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.source;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.metadata.model.ISourceAware;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.metadata.project.ProjectManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+
+public class SourceManager {
+    private static final Logger logger = LoggerFactory.getLogger(SourceManager.class);
+
+    private final KylinConfig systemConfig;
+    private final Cache<String, ISource> sourceMap;
+
+    public static SourceManager getInstance(KylinConfig config) {
+        return config.getManager(SourceManager.class);
+    }
+
+    // called by reflection
+    static SourceManager newInstance(KylinConfig config) throws IOException {
+        return new SourceManager(config);
+    }
+
+    // ============================================
+
+    private SourceManager(KylinConfig config) {
+        this.systemConfig = config;
+        this.sourceMap = CacheBuilder.newBuilder().expireAfterWrite(1, TimeUnit.DAYS)
+                .removalListener(new RemovalListener<String, ISource>() {
+                    @Override
+                    public void onRemoval(RemovalNotification<String, ISource> entry) {
+                        ISource s = entry.getValue();
+                        if (s != null) {
+                            try {
+                                s.close();
+                            } catch (Throwable e) {
+                                logger.error("Failed to close ISource: {}", s.getClass().getName(), e);
+                            }
+                        }
+                    }
+                }).build();
+    }
+
+    public ISource getCachedSource(ISourceAware aware) {
+        String key = createSourceCacheKey(aware);
+        ISource source = sourceMap.getIfPresent(key);
+        if (source != null)
+            return source;
+
+        synchronized (this) {
+            source = sourceMap.getIfPresent(key);
+            if (source != null)
+                return source;
+
+            source = createSource(aware);
+            sourceMap.put(key, source);
+            return source;
+        }
+    }
+
+    public ISource getProjectSource(String projectName) {
+        ProjectInstance projectInstance = ProjectManager.getInstance(systemConfig).getProject(projectName);
+        if (projectInstance != null)
+            return getCachedSource(projectInstance);
+        else
+            return getDefaultSource();
+    }
+
+    private String createSourceCacheKey(ISourceAware aware) {
+        StringBuilder builder = new StringBuilder();
+        builder.append(aware.getSourceType()).append('|');
+
+        KylinConfig config = aware.getConfig();
+        builder.append(config.getJdbcSourceConnectionUrl()).append('|');
+        builder.append(config.getJdbcSourceDriver()).append('|');
+        builder.append(config.getJdbcSourceUser()).append('|');
+        builder.append(config.getJdbcSourceFieldDelimiter()).append('|');
+        builder.append(config.getJdbcSourceDialect()).append('|');
+        return builder.toString(); // jdbc password not needed, because url+user should be identical.
+    }
+
+    private ISource createSource(ISourceAware aware) {
+        String sourceClazz = systemConfig.getSourceEngines().get(aware.getSourceType());
+        try {
+            return ClassUtil.forName(sourceClazz, ISource.class).getDeclaredConstructor(KylinConfig.class)
+                    .newInstance(aware.getConfig());
+        } catch (Throwable e) {
+            logger.error("Failed to create source: SourceType={}", aware.getSourceType(), e);
+            return null;
+        }
+    }
+
+    // ==========================================================
+
+    public static ISource getSource(ISourceAware aware) {
+        return getInstance(aware.getConfig()).getCachedSource(aware);
+    }
+
+    public static ISource getDefaultSource() {
+        final KylinConfig sysConfig = KylinConfig.getInstanceFromEnv();
+        return getSource(new ISourceAware() {
+            @Override
+            public int getSourceType() {
+                return sysConfig.getDefaultSource();
+            }
+
+            @Override
+            public KylinConfig getConfig() {
+                return sysConfig;
+            }
+        });
+    }
+
+    public static IReadableTable createReadableTable(TableDesc table) {
+        return getSource(table).createReadableTable(table);
+    }
+
+    public static <T> T createEngineAdapter(ISourceAware table, Class<T> engineInterface) {
+        return getSource(table).adaptToBuildEngine(engineInterface);
+    }
+
+    public static List<String> getMRDependentResources(TableDesc table) {
+        return getSource(table).getSourceMetadataExplorer().getRelatedKylinResources(table);
+    }
+}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
index 124e5e7..3a9d0ed 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
@@ -33,23 +33,23 @@ import org.apache.kylin.engine.mr.IMROutput2.IMRBatchMergeOutputSide2;
 import org.apache.kylin.metadata.TableMetadataManager;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.source.SourceFactory;
+import org.apache.kylin.source.SourceManager;
 import org.apache.kylin.storage.StorageFactory;
 
 public class MRUtil {
 
     public static IMRBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg) {
         IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(seg);
-        return SourceFactory.createEngineAdapter(seg, IMRInput.class).getBatchCubingInputSide(flatDesc);
+        return SourceManager.createEngineAdapter(seg, IMRInput.class).getBatchCubingInputSide(flatDesc);
     }
 
     public static IMRTableInputFormat getTableInputFormat(String tableName, String prj) {
         TableDesc t = getTableDesc(tableName, prj);
-        return SourceFactory.createEngineAdapter(t, IMRInput.class).getTableInputFormat(t);
+        return SourceManager.createEngineAdapter(t, IMRInput.class).getTableInputFormat(t);
     }
 
     public static IMRTableInputFormat getTableInputFormat(TableDesc tableDesc) {
-        return SourceFactory.createEngineAdapter(tableDesc, IMRInput.class).getTableInputFormat(tableDesc);
+        return SourceManager.createEngineAdapter(tableDesc, IMRInput.class).getTableInputFormat(tableDesc);
     }
 
     private static TableDesc getTableDesc(String tableName, String prj) {
@@ -73,7 +73,7 @@ public class MRUtil {
     }
 
     public static IMRInput.IMRBatchMergeInputSide getBatchMergeInputSide(CubeSegment seg) {
-        return SourceFactory.createEngineAdapter(seg, IMRInput.class).getBatchMergeInputSide(seg);
+        return SourceManager.createEngineAdapter(seg, IMRInput.class).getBatchMergeInputSide(seg);
     }
 
     public static IMROutput2.IMRBatchOptimizeOutputSide2 getBatchOptimizeOutputSide2(CubeSegment seg) {
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobRelatedMetaUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobRelatedMetaUtil.java
index c34245b..2cd1841 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobRelatedMetaUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobRelatedMetaUtil.java
@@ -24,7 +24,7 @@ import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TableRef;
-import org.apache.kylin.source.SourceFactory;
+import org.apache.kylin.source.SourceManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,7 +47,7 @@ public class JobRelatedMetaUtil {
         for (TableRef tableRef : cube.getDescriptor().getModel().getAllTables()) {
             TableDesc table = tableRef.getTableDesc();
             dumpList.add(table.getResourcePath());
-            dumpList.addAll(SourceFactory.getMRDependentResources(table));
+            dumpList.addAll(SourceManager.getMRDependentResources(table));
         }
 
         return dumpList;
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index 181e8b9..216ccc1 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -62,7 +62,7 @@ import org.apache.kylin.metadata.streaming.StreamingConfig;
 import org.apache.kylin.metadata.streaming.StreamingManager;
 import org.apache.kylin.rest.job.StorageCleanupJob;
 import org.apache.kylin.source.ISource;
-import org.apache.kylin.source.SourceFactory;
+import org.apache.kylin.source.SourceManager;
 import org.apache.kylin.source.SourcePartition;
 import org.apache.kylin.source.kafka.KafkaConfigManager;
 import org.apache.kylin.source.kafka.config.BrokerConfig;
@@ -282,7 +282,7 @@ public class BuildCubeWithStream {
 
     protected ExecutableState buildSegment(String cubeName, long startOffset, long endOffset) throws Exception {
         CubeInstance cubeInstance = cubeManager.getCube(cubeName);
-        ISource source = SourceFactory.getSource(cubeInstance);
+        ISource source = SourceManager.getSource(cubeInstance);
         SourcePartition partition = source.enrichSourcePartitionBeforeBuild(cubeInstance,
                 new SourcePartition(null, new SegmentRange(startOffset, endOffset), null, null));
         CubeSegment segment = cubeManager.appendSegment(cubeManager.getCube(cubeName), partition);
diff --git a/kylin-it/src/test/java/org/apache/kylin/source/SourceManagerTest.java b/kylin-it/src/test/java/org/apache/kylin/source/SourceManagerTest.java
new file mode 100644
index 0000000..1d7440e
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/source/SourceManagerTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.metadata.model.ISourceAware;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class SourceManagerTest extends LocalFileMetadataTestCase {
+    @BeforeClass
+    public static void beforeClass() {
+        staticCreateTestMetadata();
+    }
+
+    @AfterClass
+    public static void afterClass() throws Exception {
+        cleanAfterClass();
+    }
+
+    @Test
+    public void testGetSource() {
+        final KylinConfig config = getTestConfig();
+        SourceManager sourceManager = SourceManager.getInstance(config);
+        ISource source = sourceManager.getCachedSource(new ISourceAware() {
+            @Override
+            public int getSourceType() {
+                return config.getDefaultSource();
+            }
+
+            @Override
+            public KylinConfig getConfig() {
+                return config;
+            }
+        });
+
+        Assert.assertEquals(config.getSourceEngines().get(config.getDefaultSource()), source.getClass().getName());
+        Assert.assertEquals(source, SourceManager.getDefaultSource());
+        Assert.assertEquals(source, SourceManager.getInstance(getTestConfig()).getProjectSource(null));
+    }
+
+}
diff --git a/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java b/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java
index 8e57bed..a5aea1b 100644
--- a/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java
@@ -26,7 +26,7 @@ import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TableExtDesc;
 import org.apache.kylin.source.ISource;
 import org.apache.kylin.source.ISourceMetadataExplorer;
-import org.apache.kylin.source.SourceFactory;
+import org.apache.kylin.source.SourceManager;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -45,7 +45,7 @@ public class ITHiveSourceTableLoaderTest extends HBaseMetadataTestCase {
 
     @Test
     public void test() throws Exception {
-        ISource source = SourceFactory.getDefaultSource();
+        ISource source = SourceManager.getDefaultSource();
         ISourceMetadataExplorer explr = source.getSourceMetadataExplorer();
         Pair<TableDesc, TableExtDesc> pair;
         
diff --git a/kylin-it/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java b/kylin-it/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java
index 384aa95..031da29 100644
--- a/kylin-it/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java
@@ -28,7 +28,7 @@ import org.apache.kylin.metadata.TableMetadataManager;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.source.IReadableTable;
 import org.apache.kylin.source.IReadableTable.TableReader;
-import org.apache.kylin.source.SourceFactory;
+import org.apache.kylin.source.SourceManager;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -56,7 +56,7 @@ public class ITSnapshotManagerTest extends HBaseMetadataTestCase {
     public void basicTest() throws Exception {
         String tableName = "EDW.TEST_SITES";
         TableDesc tableDesc = TableMetadataManager.getInstance(getTestConfig()).getTableDesc(tableName, "default");
-        IReadableTable hiveTable = SourceFactory.createReadableTable(tableDesc);
+        IReadableTable hiveTable = SourceManager.createReadableTable(tableDesc);
         String snapshotPath = snapshotMgr.buildSnapshot(hiveTable, tableDesc).getResourcePath();
 
         snapshotMgr.wipeoutCache();
diff --git a/kylin-it/src/test/java/org/apache/kylin/source/jdbc/ITJdbcSourceTableLoaderTest.java b/kylin-it/src/test/java/org/apache/kylin/source/jdbc/ITJdbcSourceTableLoaderTest.java
index 3869cb6..557e2e7 100644
--- a/kylin-it/src/test/java/org/apache/kylin/source/jdbc/ITJdbcSourceTableLoaderTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/source/jdbc/ITJdbcSourceTableLoaderTest.java
@@ -35,7 +35,7 @@ import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.query.H2Database;
 import org.apache.kylin.source.ISource;
 import org.apache.kylin.source.ISourceMetadataExplorer;
-import org.apache.kylin.source.SourceFactory;
+import org.apache.kylin.source.SourceManager;
 import org.apache.kylin.source.datagen.ModelDataGenerator;
 import org.junit.After;
 import org.junit.Before;
@@ -94,7 +94,7 @@ public class ITJdbcSourceTableLoaderTest extends LocalFileMetadataTestCase imple
     @Test
     public void test() throws Exception {
 
-        ISource source = SourceFactory.getSource(new ITJdbcSourceTableLoaderTest());
+        ISource source = SourceManager.getSource(new ITJdbcSourceTableLoaderTest());
         ISourceMetadataExplorer explr = source.getSourceMetadataExplorer();
         Pair<TableDesc, TableExtDesc> pair;
 
@@ -111,4 +111,9 @@ public class ITJdbcSourceTableLoaderTest extends LocalFileMetadataTestCase imple
         return ISourceAware.ID_JDBC;
     }
 
+    @Override
+    public KylinConfig getConfig() {
+        return config;
+    }
+
 }
diff --git a/kylin-it/src/test/java/org/apache/kylin/source/jdbc/ITJdbcTableReaderTest.java b/kylin-it/src/test/java/org/apache/kylin/source/jdbc/ITJdbcTableReaderTest.java
index 4a5bfe4..4441178 100644
--- a/kylin-it/src/test/java/org/apache/kylin/source/jdbc/ITJdbcTableReaderTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/source/jdbc/ITJdbcTableReaderTest.java
@@ -106,4 +106,9 @@ public class ITJdbcTableReaderTest extends LocalFileMetadataTestCase implements
         return ISourceAware.ID_JDBC;
     }
 
+    @Override
+    public KylinConfig getConfig() {
+        return getTestConfig();
+    }
+
 }
diff --git a/query/src/main/java/org/apache/kylin/query/schema/OLAPSchemaFactory.java b/query/src/main/java/org/apache/kylin/query/schema/OLAPSchemaFactory.java
index 25baf55..a1935fe 100644
--- a/query/src/main/java/org/apache/kylin/query/schema/OLAPSchemaFactory.java
+++ b/query/src/main/java/org/apache/kylin/query/schema/OLAPSchemaFactory.java
@@ -49,19 +49,22 @@ public class OLAPSchemaFactory implements SchemaFactory {
     @Override
     public Schema create(SchemaPlus parentSchema, String schemaName, Map<String, Object> operand) {
         String project = (String) operand.get(SCHEMA_PROJECT);
-        Schema newSchema = new OLAPSchema(project, schemaName, exposeMore());
+        Schema newSchema = new OLAPSchema(project, schemaName, exposeMore(project));
         return newSchema;
     }
 
     private static Map<String, File> cachedJsons = Maps.newConcurrentMap();
 
-    public static boolean exposeMore() {
-        return KylinConfig.getInstanceFromEnv().isPushDownEnabled();
+    public static boolean exposeMore(String project) {
+        return ProjectManager.getInstance(KylinConfig.getInstanceFromEnv()).getProject(project).getConfig()
+                .isPushDownEnabled();
     }
 
     public static File createTempOLAPJson(String project, KylinConfig config) {
 
-        Collection<TableDesc> tables = ProjectManager.getInstance(config).listExposedTables(project, exposeMore());
+        ProjectManager projectManager = ProjectManager.getInstance(config);
+        KylinConfig projConfig = projectManager.getProject(project).getConfig();
+        Collection<TableDesc> tables = projectManager.listExposedTables(project, exposeMore(project));
 
         // "database" in TableDesc correspond to our schema
         // the logic to decide which schema to be "default" in calcite:
@@ -92,17 +95,16 @@ public class OLAPSchemaFactory implements SchemaFactory {
 
             int counter = 0;
 
-
-
+            String schemaFactory = projConfig.getSchemaFactory();
             for (String schemaName : schemaCounts.keySet()) {
                 out.append("        {\n");
                 out.append("            \"type\": \"custom\",\n");
                 out.append("            \"name\": \"" + schemaName + "\",\n");
-                out.append("            \"factory\": \"" + KylinConfig.getInstanceFromEnv().getSchemaFactory()+ "\",\n");
+                out.append("            \"factory\": \"" + schemaFactory + "\",\n");
                 out.append("            \"operand\": {\n");
                 out.append("                \"" + SCHEMA_PROJECT + "\": \"" + project + "\"\n");
                 out.append("            },\n");
-                createOLAPSchemaFunctions(out);
+                createOLAPSchemaFunctions(projConfig.getUDFs(), out);
                 out.append("        }\n");
 
                 if (++counter != schemaCounts.size()) {
@@ -132,9 +134,12 @@ public class OLAPSchemaFactory implements SchemaFactory {
         }
     }
 
-    private static void createOLAPSchemaFunctions(StringBuilder out) throws IOException {
+    private static void createOLAPSchemaFunctions(Map<String, String> definedUdfs, StringBuilder out)
+            throws IOException {
         Map<String, String> udfs = Maps.newHashMap();
-        udfs.putAll(KylinConfig.getInstanceFromEnv().getUDFs());
+        if (definedUdfs != null)
+            udfs.putAll(definedUdfs);
+
         for (Entry<String, Class<?>> entry : MeasureTypeFactory.getUDAFs().entrySet()) {
             udfs.put(entry.getKey(), entry.getValue().getName());
         }
diff --git a/query/src/main/java/org/apache/kylin/query/util/PushDownUtil.java b/query/src/main/java/org/apache/kylin/query/util/PushDownUtil.java
index cfe16c0..7c88141 100644
--- a/query/src/main/java/org/apache/kylin/query/util/PushDownUtil.java
+++ b/query/src/main/java/org/apache/kylin/query/util/PushDownUtil.java
@@ -47,6 +47,7 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.metadata.model.tool.CalciteParser;
+import org.apache.kylin.metadata.project.ProjectManager;
 import org.apache.kylin.metadata.querymeta.SelectedColumnMeta;
 import org.apache.kylin.metadata.realization.NoRealizationFoundException;
 import org.apache.kylin.metadata.realization.RoutingIndicatorException;
@@ -74,7 +75,7 @@ public class PushDownUtil {
     private static Pair<List<List<String>>, List<SelectedColumnMeta>> tryPushDownQuery(String project, String sql,
             String defaultSchema, SQLException sqlException, boolean isSelect, boolean isPrepare) throws Exception {
 
-        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        KylinConfig kylinConfig = ProjectManager.getInstance(KylinConfig.getInstanceFromEnv()).getProject(project).getConfig();
 
         if (!kylinConfig.isPushDownEnabled())
             return null;
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
index 35849f0..7ada8cc 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
@@ -182,9 +182,9 @@ public class TableController extends BasicController {
      */
     @RequestMapping(value = "/hive", method = { RequestMethod.GET }, produces = { "application/json" })
     @ResponseBody
-    private List<String> showHiveDatabases() throws IOException {
+    private List<String> showHiveDatabases(@RequestParam(value = "project", required = false) String project) throws IOException {
         try {
-            return tableService.getHiveDbNames();
+            return tableService.getSourceDbNames(project);
         } catch (Throwable e) {
             logger.error(e.getLocalizedMessage(), e);
             throw new InternalErrorException(e.getLocalizedMessage());
@@ -199,9 +199,9 @@ public class TableController extends BasicController {
      */
     @RequestMapping(value = "/hive/{database}", method = { RequestMethod.GET }, produces = { "application/json" })
     @ResponseBody
-    private List<String> showHiveTables(@PathVariable String database) throws IOException {
+    private List<String> showHiveTables(@PathVariable String database, @RequestParam(value = "project", required = false) String project) throws IOException {
         try {
-            return tableService.getHiveTableNames(database);
+            return tableService.getSourceTableNames(project, database);
         } catch (Throwable e) {
             logger.error(e.getLocalizedMessage(), e);
             throw new InternalErrorException(e.getLocalizedMessage());
diff --git a/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java b/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java
index 59bd21f..114050c 100755
--- a/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java
@@ -52,7 +52,7 @@ import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.metadata.MetadataConstants;
 import org.apache.kylin.source.ISourceMetadataExplorer;
-import org.apache.kylin.source.SourceFactory;
+import org.apache.kylin.source.SourceManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -133,7 +133,7 @@ public class StorageCleanupJob extends AbstractApplication {
     }
 
     protected List<String> getHiveTables() throws Exception {
-        ISourceMetadataExplorer explr = SourceFactory.getDefaultSource().getSourceMetadataExplorer();
+        ISourceMetadataExplorer explr = SourceManager.getDefaultSource().getSourceMetadataExplorer();
         return explr.listTables(KylinConfig.getInstanceFromEnv().getHiveDatabaseForIntermediateTable());
     }
 
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index 6afc568..4317ed5 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -61,7 +61,7 @@ import org.apache.kylin.rest.msg.Message;
 import org.apache.kylin.rest.msg.MsgPicker;
 import org.apache.kylin.rest.util.AclEvaluate;
 import org.apache.kylin.source.ISource;
-import org.apache.kylin.source.SourceFactory;
+import org.apache.kylin.source.SourceManager;
 import org.apache.kylin.source.SourcePartition;
 import org.apache.kylin.storage.hbase.util.ZookeeperJobLock;
 import org.slf4j.Logger;
@@ -230,7 +230,7 @@ public class JobService extends BasicService implements InitializingBean {
         CubeSegment newSeg = null;
         try {
             if (buildType == CubeBuildTypeEnum.BUILD) {
-                ISource source = SourceFactory.getSource(cube);
+                ISource source = SourceManager.getSource(cube);
                 SourcePartition src = new SourcePartition(tsRange, segRange, sourcePartitionOffsetStart,
                         sourcePartitionOffsetEnd);
                 src = source.enrichSourcePartitionBeforeBuild(cube, src);
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
index d737a6a..ace1686 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
@@ -29,8 +29,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 
-import javax.annotation.Nullable;
-
 import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.Pair;
@@ -40,11 +38,11 @@ import org.apache.kylin.engine.mr.common.MapReduceExecutable;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.metadata.MetadataConstants;
 import org.apache.kylin.metadata.TableMetadataManager;
 import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TableExtDesc;
+import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.streaming.StreamingConfig;
 import org.apache.kylin.rest.exception.BadRequestException;
 import org.apache.kylin.rest.msg.Message;
@@ -52,7 +50,7 @@ import org.apache.kylin.rest.msg.MsgPicker;
 import org.apache.kylin.rest.response.TableDescResponse;
 import org.apache.kylin.rest.util.AclEvaluate;
 import org.apache.kylin.source.ISourceMetadataExplorer;
-import org.apache.kylin.source.SourceFactory;
+import org.apache.kylin.source.SourceManager;
 import org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityJob;
 import org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityUpdateJob;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
@@ -64,8 +62,6 @@ import org.springframework.security.core.context.SecurityContextHolder;
 import org.springframework.stereotype.Component;
 
 import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterables;
 import com.google.common.collect.LinkedHashMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.SetMultimap;
@@ -136,7 +132,7 @@ public class TableService extends BasicService {
         for (Pair<TableDesc, TableExtDesc> pair : allMeta) {
             TableDesc tableDesc = pair.getFirst();
             TableExtDesc extDesc = pair.getSecond();
-            
+
             TableDesc origTable = metaMgr.getTableDesc(tableDesc.getIdentity(), project);
             if (origTable == null || origTable.getProject() == null) {
                 tableDesc.setUuid(UUID.randomUUID().toString());
@@ -157,7 +153,7 @@ public class TableService extends BasicService {
             }
             extDesc.init(project);
             metaMgr.saveTableExt(extDesc, project);
-            
+
             saved.add(tableDesc.getIdentity());
         }
 
@@ -176,14 +172,15 @@ public class TableService extends BasicService {
 
         // load all tables first
         List<Pair<TableDesc, TableExtDesc>> allMeta = Lists.newArrayList();
-        ISourceMetadataExplorer explr = SourceFactory.getDefaultSource().getSourceMetadataExplorer();
+        ProjectInstance projectInstance = getProjectManager().getProject(project);
+        ISourceMetadataExplorer explr = SourceManager.getSource(projectInstance).getSourceMetadataExplorer();
         for (Map.Entry<String, String> entry : db2tables.entries()) {
             Pair<TableDesc, TableExtDesc> pair = explr.loadTableMetadata(entry.getKey(), entry.getValue(), project);
             TableDesc tableDesc = pair.getFirst();
             Preconditions.checkState(tableDesc.getDatabase().equals(entry.getKey().toUpperCase()));
             Preconditions.checkState(tableDesc.getName().equals(entry.getValue().toUpperCase()));
-            Preconditions.checkState(tableDesc.getIdentity().equals(entry.getKey().toUpperCase() + "." + entry
-                .getValue().toUpperCase()));
+            Preconditions.checkState(tableDesc.getIdentity()
+                    .equals(entry.getKey().toUpperCase() + "." + entry.getValue().toUpperCase()));
             TableExtDesc extDesc = pair.getSecond();
             Preconditions.checkState(tableDesc.getIdentity().equals(extDesc.getIdentity()));
             allMeta.add(pair);
@@ -191,7 +188,8 @@ public class TableService extends BasicService {
         return allMeta;
     }
 
-    public Map<String, String[]> loadHiveTables(String[] tableNames, String project, boolean isNeedProfile) throws Exception {
+    public Map<String, String[]> loadHiveTables(String[] tableNames, String project, boolean isNeedProfile)
+            throws Exception {
         aclEvaluate.checkProjectAdminPermission(project);
         String submitter = SecurityContextHolder.getContext().getAuthentication().getName();
         Map<String, String[]> result = new HashMap<String, String[]>();
@@ -258,13 +256,14 @@ public class TableService extends BasicService {
 
         tableName = normalizeHiveTableName(tableName);
         TableDesc desc = getTableManager().getTableDesc(tableName, project);
-        
+
         // unload of legacy global table is not supported for now
         if (desc == null || desc.getProject() == null) {
-            logger.warn("Unload Table {} in Project {} failed, could not find TableDesc or related Project", tableName, project);
+            logger.warn("Unload Table {} in Project {} failed, could not find TableDesc or related Project", tableName,
+                    project);
             return false;
         }
-        
+
         tableType = desc.getSourceType();
 
         if (!modelService.isTableInModel(desc, project)) {
@@ -274,7 +273,7 @@ public class TableService extends BasicService {
             List<String> models = modelService.getModelsUsingTable(desc, project);
             throw new BadRequestException(String.format(msg.getTABLE_IN_USE_BY_MODEL(), models));
         }
-        
+
         // it is a project local table, ready to remove since no model is using it within the project
         TableMetadataManager metaMgr = getTableManager();
         metaMgr.removeTableExt(tableName, project);
@@ -313,30 +312,27 @@ public class TableService extends BasicService {
 
     /**
      *
+     * @param project
      * @return
      * @throws Exception
      */
-    public List<String> getHiveDbNames() throws Exception {
-        ISourceMetadataExplorer explr = SourceFactory.getDefaultSource().getSourceMetadataExplorer();
+    public List<String> getSourceDbNames(String project) throws Exception {
+        ISourceMetadataExplorer explr = SourceManager.getInstance(getConfig()).getProjectSource(project)
+                .getSourceMetadataExplorer();
         return explr.listDatabases();
     }
 
     /**
      *
+     * @param project
      * @param database
      * @return
      * @throws Exception
      */
-    public List<String> getHiveTableNames(String database) throws Exception {
-        ISourceMetadataExplorer explr = SourceFactory.getDefaultSource().getSourceMetadataExplorer();
-        List<String> hiveTableNames = explr.listTables(database);
-        Iterable<String> kylinApplicationTableNames = Iterables.filter(hiveTableNames, new Predicate<String>() {
-            @Override
-            public boolean apply(@Nullable String input) {
-                return input != null && !input.startsWith(MetadataConstants.KYLIN_INTERMEDIATE_PREFIX);
-            }
-        });
-        return Lists.newArrayList(kylinApplicationTableNames);
+    public List<String> getSourceTableNames(String project, String database) throws Exception {
+        ISourceMetadataExplorer explr = SourceManager.getInstance(getConfig()).getProjectSource(project)
+                .getSourceMetadataExplorer();
+        return explr.listTables(database);
     }
 
     private TableDescResponse cloneTableDesc(TableDesc table, String prj) {
@@ -355,7 +351,8 @@ public class TableService extends BasicService {
                 if (cards.length > i) {
                     cardinality.put(columnDesc.getName(), Long.parseLong(cards[i]));
                 } else {
-                    logger.error("The result cardinality is not identical with hive table metadata, cardinality : " + scard + " column array length: " + cdescs.length);
+                    logger.error("The result cardinality is not identical with hive table metadata, cardinality : "
+                            + scard + " column array length: " + cdescs.length);
                     break;
                 }
             }
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java
index 129098c..58bd2c3 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java
@@ -18,6 +18,8 @@
 
 package org.apache.kylin.source.hive;
 
+import java.io.IOException;
+
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.engine.mr.IMRInput;
 import org.apache.kylin.metadata.model.IBuildable;
@@ -28,8 +30,10 @@ import org.apache.kylin.source.ISource;
 import org.apache.kylin.source.ISourceMetadataExplorer;
 import org.apache.kylin.source.SourcePartition;
 
-//used by reflection
 public class HiveSource implements ISource {
+    //used by reflection
+    public HiveSource(KylinConfig config) {
+    }
 
     @Override
     public ISourceMetadataExplorer getSourceMetadataExplorer() {
@@ -53,7 +57,7 @@ public class HiveSource implements ISource {
         if (tableDesc.isView()) {
             KylinConfig config = KylinConfig.getInstanceFromEnv();
             String tableName = tableDesc.getMaterializedName();
-            
+
             tableDesc = new TableDesc();
             tableDesc.setDatabase(config.getHiveDatabaseForIntermediateTable());
             tableDesc.setName(tableName);
@@ -75,4 +79,8 @@ public class HiveSource implements ISource {
         return new HiveMetadataExplorer();
     }
 
+    @Override
+    public void close() throws IOException {
+        // not needed
+    }
 }
diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java
index 5e06f90..ae3bbc5 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java
@@ -18,6 +18,9 @@
 
 package org.apache.kylin.source.jdbc;
 
+import java.io.IOException;
+
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.engine.mr.IMRInput;
 import org.apache.kylin.metadata.model.IBuildable;
 import org.apache.kylin.metadata.model.TableDesc;
@@ -27,8 +30,10 @@ import org.apache.kylin.source.ISource;
 import org.apache.kylin.source.ISourceMetadataExplorer;
 import org.apache.kylin.source.SourcePartition;
 
-//used by reflection
 public class JdbcSource implements ISource {
+    //used by reflection
+    public JdbcSource(KylinConfig config) {
+    }
 
     @Override
     public ISourceMetadataExplorer getSourceMetadataExplorer() {
@@ -62,4 +67,8 @@ public class JdbcSource implements ISource {
         return new JdbcExplorer();
     }
 
+    @Override
+    public void close() throws IOException {
+        // not needed
+    }
 }
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
index 1142243..0ab83c6 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.source.kafka;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 
@@ -46,11 +47,14 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Lists;
 
-//used by reflection
 public class KafkaSource implements ISource {
 
     private static final Logger logger = LoggerFactory.getLogger(KafkaSource.class);
 
+    //used by reflection
+    public KafkaSource(KylinConfig config) {
+    }
+
     @SuppressWarnings("unchecked")
     @Override
     public <I> I adaptToBuildEngine(Class<I> engineInterface) {
@@ -75,20 +79,25 @@ public class KafkaSource implements ISource {
         if (range == null || range.start.v.equals(0L)) {
             final CubeSegment last = cube.getLastSegment();
             if (last != null) {
-                logger.debug("Last segment exists, continue from last segment " + last.getName() + "'s end position: " + last.getSourcePartitionOffsetEnd());
+                logger.debug("Last segment exists, continue from last segment " + last.getName() + "'s end position: "
+                        + last.getSourcePartitionOffsetEnd());
                 // from last seg's end position
                 result.setSourcePartitionOffsetStart(last.getSourcePartitionOffsetEnd());
-            } else if (cube.getDescriptor().getPartitionOffsetStart() != null && cube.getDescriptor().getPartitionOffsetStart().size() > 0) {
-                logger.debug("Last segment doesn't exist, use the start offset that be initiated previously: " + cube.getDescriptor().getPartitionOffsetStart());
+            } else if (cube.getDescriptor().getPartitionOffsetStart() != null
+                    && cube.getDescriptor().getPartitionOffsetStart().size() > 0) {
+                logger.debug("Last segment doesn't exist, use the start offset that be initiated previously: "
+                        + cube.getDescriptor().getPartitionOffsetStart());
                 result.setSourcePartitionOffsetStart(cube.getDescriptor().getPartitionOffsetStart());
             } else {
                 // from the topic's earliest offset;
-                logger.debug("Last segment doesn't exist, and didn't initiate the start offset, will seek from topic's earliest offset.");
+                logger.debug(
+                        "Last segment doesn't exist, and didn't initiate the start offset, will seek from topic's earliest offset.");
                 result.setSourcePartitionOffsetStart(KafkaClient.getEarliestOffsets(cube));
             }
         }
 
-        final KafkaConfig kafkaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(cube.getRootFactTable());
+        final KafkaConfig kafkaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv())
+                .getKafkaConfig(cube.getRootFactTable());
         final String brokers = KafkaClient.getKafkaBrokers(kafkaConfig);
         final String topic = kafkaConfig.getTopic();
         try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cube.getName(), null)) {
@@ -111,7 +120,9 @@ public class KafkaSource implements ISource {
             for (Integer partitionId : latestOffsets.keySet()) {
                 if (result.getSourcePartitionOffsetStart().containsKey(partitionId)) {
                     if (result.getSourcePartitionOffsetStart().get(partitionId) > latestOffsets.get(partitionId)) {
-                        throw new IllegalArgumentException("Partition " + partitionId + " end offset (" + latestOffsets.get(partitionId) + ") is smaller than start offset ( " + result.getSourcePartitionOffsetStart().get(partitionId) + ")");
+                        throw new IllegalArgumentException("Partition " + partitionId + " end offset ("
+                                + latestOffsets.get(partitionId) + ") is smaller than start offset ( "
+                                + result.getSourcePartitionOffsetStart().get(partitionId) + ")");
                     }
                 } else {
                     throw new IllegalStateException("New partition added in between, retry.");
@@ -129,7 +140,8 @@ public class KafkaSource implements ISource {
         }
 
         if (totalStartOffset > totalEndOffset) {
-            throw new IllegalArgumentException("Illegal offset: start: " + totalStartOffset + ", end: " + totalEndOffset);
+            throw new IllegalArgumentException(
+                    "Illegal offset: start: " + totalStartOffset + ", end: " + totalEndOffset);
         }
 
         if (totalStartOffset == totalEndOffset) {
@@ -155,7 +167,8 @@ public class KafkaSource implements ISource {
 
         if (startOffset > 0) {
             if (sourcePartitionOffsetStart == null || sourcePartitionOffsetStart.size() == 0) {
-                throw new IllegalArgumentException("When 'startOffset' is > 0, need provide each partition's start offset");
+                throw new IllegalArgumentException(
+                        "When 'startOffset' is > 0, need provide each partition's start offset");
             }
 
             long totalOffset = 0;
@@ -164,13 +177,15 @@ public class KafkaSource implements ISource {
             }
 
             if (totalOffset != startOffset) {
-                throw new IllegalArgumentException("Invalid 'sourcePartitionOffsetStart', doesn't match with 'startOffset'");
+                throw new IllegalArgumentException(
+                        "Invalid 'sourcePartitionOffsetStart', doesn't match with 'startOffset'");
             }
         }
 
         if (endOffset > 0 && endOffset != Long.MAX_VALUE) {
             if (sourcePartitionOffsetEnd == null || sourcePartitionOffsetEnd.size() == 0) {
-                throw new IllegalArgumentException("When 'endOffset' is not Long.MAX_VALUE, need provide each partition's start offset");
+                throw new IllegalArgumentException(
+                        "When 'endOffset' is not Long.MAX_VALUE, need provide each partition's start offset");
             }
 
             long totalOffset = 0;
@@ -179,7 +194,8 @@ public class KafkaSource implements ISource {
             }
 
             if (totalOffset != endOffset) {
-                throw new IllegalArgumentException("Invalid 'sourcePartitionOffsetEnd', doesn't match with 'endOffset'");
+                throw new IllegalArgumentException(
+                        "Invalid 'sourcePartitionOffsetEnd', doesn't match with 'endOffset'");
             }
         }
     }
@@ -199,7 +215,8 @@ public class KafkaSource implements ISource {
             }
 
             @Override
-            public Pair<TableDesc, TableExtDesc> loadTableMetadata(String database, String table, String prj) throws Exception {
+            public Pair<TableDesc, TableExtDesc> loadTableMetadata(String database, String table, String prj)
+                    throws Exception {
                 throw new UnsupportedOperationException();
             }
 
@@ -223,4 +240,8 @@ public class KafkaSource implements ISource {
         throw new UnsupportedOperationException();
     }
 
+    @Override
+    public void close() throws IOException {
+        // not needed
+    }
 }
diff --git a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/KylinTableCreator.java b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/KylinTableCreator.java
index 8aac466..a2a0616 100644
--- a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/KylinTableCreator.java
+++ b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/KylinTableCreator.java
@@ -54,38 +54,39 @@ public class KylinTableCreator {
         List<Pair<String, String>> columns = Lists.newLinkedList();
         columns.addAll(HiveTableCreator.getHiveColumnsForMetricsQuery());
         columns.addAll(HiveTableCreator.getPartitionKVsForHiveTable());
-        return generateKylinTable(sinkTool, kylinConfig.getKylinMetricsSubjectQuery(), columns);
+        return generateKylinTable(kylinConfig, sinkTool, kylinConfig.getKylinMetricsSubjectQuery(), columns);
     }
 
     public static TableDesc generateKylinTableForMetricsQueryCube(KylinConfig kylinConfig, SinkTool sinkTool) {
         List<Pair<String, String>> columns = Lists.newLinkedList();
         columns.addAll(HiveTableCreator.getHiveColumnsForMetricsQueryCube());
         columns.addAll(HiveTableCreator.getPartitionKVsForHiveTable());
-        return generateKylinTable(sinkTool, kylinConfig.getKylinMetricsSubjectQueryCube(), columns);
+        return generateKylinTable(kylinConfig, sinkTool, kylinConfig.getKylinMetricsSubjectQueryCube(), columns);
     }
 
     public static TableDesc generateKylinTableForMetricsQueryRPC(KylinConfig kylinConfig, SinkTool sinkTool) {
         List<Pair<String, String>> columns = Lists.newLinkedList();
         columns.addAll(HiveTableCreator.getHiveColumnsForMetricsQueryRPC());
         columns.addAll(HiveTableCreator.getPartitionKVsForHiveTable());
-        return generateKylinTable(sinkTool, kylinConfig.getKylinMetricsSubjectQueryRpcCall(), columns);
+        return generateKylinTable(kylinConfig, sinkTool, kylinConfig.getKylinMetricsSubjectQueryRpcCall(), columns);
     }
 
     public static TableDesc generateKylinTableForMetricsJob(KylinConfig kylinConfig, SinkTool sinkTool) {
         List<Pair<String, String>> columns = Lists.newLinkedList();
         columns.addAll(HiveTableCreator.getHiveColumnsForMetricsJob());
         columns.addAll(HiveTableCreator.getPartitionKVsForHiveTable());
-        return generateKylinTable(sinkTool, kylinConfig.getKylinMetricsSubjectJob(), columns);
+        return generateKylinTable(kylinConfig, sinkTool, kylinConfig.getKylinMetricsSubjectJob(), columns);
     }
 
     public static TableDesc generateKylinTableForMetricsJobException(KylinConfig kylinConfig, SinkTool sinkTool) {
         List<Pair<String, String>> columns = Lists.newLinkedList();
         columns.addAll(HiveTableCreator.getHiveColumnsForMetricsJobException());
         columns.addAll(HiveTableCreator.getPartitionKVsForHiveTable());
-        return generateKylinTable(sinkTool, kylinConfig.getKylinMetricsSubjectJobException(), columns);
+        return generateKylinTable(kylinConfig, sinkTool, kylinConfig.getKylinMetricsSubjectJobException(), columns);
     }
 
-    public static TableDesc generateKylinTable(SinkTool sinkTool, String subject, List<Pair<String, String>> columns) {
+    public static TableDesc generateKylinTable(KylinConfig kylinConfig, SinkTool sinkTool, String subject,
+            List<Pair<String, String>> columns) {
         TableDesc kylinTable = new TableDesc();
 
         Pair<String, String> tableNameSplits = ActiveReservoirReporter
@@ -107,7 +108,7 @@ public class KylinTableCreator {
         }
         kylinTable.setColumns(columnDescs);
 
-        kylinTable.init(MetricsManager.SYSTEM_PROJECT);
+        kylinTable.init(kylinConfig, MetricsManager.SYSTEM_PROJECT);
 
         return kylinTable;
     }

-- 
To stop receiving notification emails like this one, please contact
liyang@apache.org.