You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/01/12 04:05:13 UTC

[10/50] [abbrv] kylin git commit: KYLIN-2351 Support using non-default HDFS path as kylin.env.hdfs-working-dir

KYLIN-2351 Support using non-default HDFS path as kylin.env.hdfs-working-dir


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/399f0c45
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/399f0c45
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/399f0c45

Branch: refs/heads/master-hbase1.x
Commit: 399f0c45108115c627cbf5f452ec33ff94167247
Parents: 503d232
Author: Li Yang <li...@apache.org>
Authored: Wed Jan 4 18:24:48 2017 +0800
Committer: Yang Li <li...@apache.org>
Committed: Fri Jan 6 06:45:37 2017 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |  9 +++++++
 .../common/util/LocalFileMetadataTestCase.java  |  4 ++-
 .../kylin/dict/AppendTrieDictionaryTest.java    |  2 +-
 .../org/apache/kylin/engine/mr/HadoopUtil.java  | 28 +++++++++++++++++++-
 .../engine/mr/common/AbstractHadoopJob.java     |  4 +--
 .../engine/mr/steps/CreateDictionaryJob.java    |  2 +-
 .../mr/steps/FactDistinctColumnsReducer.java    | 10 ++++---
 .../engine/mr/steps/MergeStatisticsStep.java    |  2 +-
 .../steps/RowKeyDistributionCheckerMapper.java  |  3 ++-
 .../engine/mr/steps/SaveStatisticsStep.java     |  2 +-
 .../kylin/engine/mr/SortedColumnReaderTest.java | 20 +++++++++-----
 .../kylin/provision/BuildCubeWithEngine.java    |  3 +--
 .../apache/kylin/query/ITMassInQueryTest.java   |  3 +--
 .../kylin/storage/hbase/ITHdfsOpsTest.java      |  4 +--
 .../apache/kylin/source/hive/HiveMRInput.java   |  2 +-
 .../apache/kylin/source/kafka/KafkaMRInput.java |  2 +-
 .../kylin/storage/hbase/HBaseResourceStore.java | 11 +++-----
 .../storage/hbase/steps/DeprecatedGCStep.java   |  3 +--
 .../steps/HDFSPathGarbageCollectionStep.java    |  2 +-
 .../hbase/steps/SequenceFileCuboidWriter.java   |  2 +-
 .../storage/hbase/util/CubeMigrationCLI.java    |  3 ++-
 .../storage/hbase/util/StorageCleanupJob.java   |  3 ++-
 .../org/apache/kylin/tool/CubeMigrationCLI.java |  3 ++-
 .../apache/kylin/tool/StorageCleanupJob.java    |  4 +--
 24 files changed, 87 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/399f0c45/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 3e6384f..88c4dbc 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -171,6 +171,15 @@ abstract public class KylinConfigBase implements Serializable {
         if (!root.endsWith("/")) {
             root += "/";
         }
+        
+        // make sure path qualified
+        if (!root.contains("://")) {
+            if (!root.startsWith("/"))
+                root = "hdfs:///" + root;
+            else
+                root = "hdfs://" + root;
+        }
+        
         return new StringBuffer(root).append(StringUtils.replaceChars(getMetadataUrlPrefix(), ':', '-')).append("/").toString();
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/399f0c45/core-common/src/test/java/org/apache/kylin/common/util/LocalFileMetadataTestCase.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/util/LocalFileMetadataTestCase.java b/core-common/src/test/java/org/apache/kylin/common/util/LocalFileMetadataTestCase.java
index 26e1e86..4a24941 100644
--- a/core-common/src/test/java/org/apache/kylin/common/util/LocalFileMetadataTestCase.java
+++ b/core-common/src/test/java/org/apache/kylin/common/util/LocalFileMetadataTestCase.java
@@ -57,7 +57,9 @@ public class LocalFileMetadataTestCase extends AbstractKylinTestCase {
         if (System.getProperty(KylinConfig.KYLIN_CONF) == null && System.getenv(KylinConfig.KYLIN_CONF) == null)
             System.setProperty(KylinConfig.KYLIN_CONF, tempTestMetadataUrl);
 
-        KylinConfig.getInstanceFromEnv().setMetadataUrl(tempTestMetadataUrl);
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+        config.setMetadataUrl(tempTestMetadataUrl);
+        config.setProperty("kylin.env.hdfs-working-dir", "file:///tmp/kylin");
     }
 
     public static void cleanAfterClass() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/399f0c45/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java
index f84f08f..921925c 100644
--- a/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java
+++ b/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java
@@ -54,7 +54,7 @@ import org.junit.Test;
  */
 public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase {
 
-    public static final String BASE_DIR = "/tmp/kylin_append_dict";
+    public static final String BASE_DIR = "file:///tmp/kylin_append_dict";
     public static final String RESOURCE_DIR = "/dict/append_dict_test";
 
     @Before

http://git-wip-us.apache.org/repos/asf/kylin/blob/399f0c45/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java
index 88692a0..3119c1e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.io.Writable;
+import org.apache.kylin.common.KylinConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,8 +67,33 @@ public class HadoopUtil {
         return conf;
     }
 
+    public static FileSystem getWorkingFileSystem() throws IOException {
+        return getFileSystem(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory());
+    }
+    
+    public static FileSystem getWorkingFileSystem(Configuration conf) throws IOException {
+        Path workingPath = new Path(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory());
+        return getFileSystem(workingPath, conf);
+    }
+    
     public static FileSystem getFileSystem(String path) throws IOException {
-        return FileSystem.get(makeURI(path), getCurrentConfiguration());
+        return getFileSystem(new Path(makeURI(path)));
+    }
+    
+    public static FileSystem getFileSystem(Path path) throws IOException {
+        Configuration conf = getCurrentConfiguration();
+        return getFileSystem(path, conf);
+    }
+    
+    public static FileSystem getFileSystem(Path path, Configuration conf) {
+        if (StringUtils.isBlank(path.toUri().getScheme()))
+            throw new IllegalArgumentException("Path must be qualified: " + path);
+        
+        try {
+            return path.getFileSystem(conf);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
     }
 
     public static URI makeURI(String filePath) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/399f0c45/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index e4e5e82..1540acd 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -283,7 +283,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
         try {
             Configuration jobConf = job.getConfiguration();
             FileSystem localfs = FileSystem.getLocal(jobConf);
-            FileSystem hdfs = FileSystem.get(jobConf);
+            FileSystem hdfs = HadoopUtil.getWorkingFileSystem(jobConf);
 
             StringBuilder jarList = new StringBuilder();
             StringBuilder fileList = new StringBuilder();
@@ -407,7 +407,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
             inp = inp.trim();
             if (inp.endsWith("/*")) {
                 inp = inp.substring(0, inp.length() - 2);
-                FileSystem fs = FileSystem.get(job.getConfiguration());
+                FileSystem fs = HadoopUtil.getWorkingFileSystem(job.getConfiguration());
                 Path path = new Path(inp);
 
                 if (!exists(fs, path)) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/399f0c45/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
index 8b9697e..b27d722 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
@@ -64,7 +64,7 @@ public class CreateDictionaryJob extends AbstractHadoopJob {
             @Override
             public Dictionary<String> getDictionary(TblColRef col) throws IOException {
                 Path dictFile = new Path(factColumnsInputPath, col.getIdentity() + FactDistinctColumnsReducer.DICT_FILE_POSTFIX);
-                FileSystem fs = HadoopUtil.getFileSystem(dictFile.toString());
+                FileSystem fs = HadoopUtil.getWorkingFileSystem();
                 if (fs.exists(dictFile) == false)
                     return null;
                 

http://git-wip-us.apache.org/repos/asf/kylin/blob/399f0c45/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
index 5d42797..3c7f951 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
@@ -43,6 +43,7 @@ import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.dict.DictionaryGenerator;
 import org.apache.kylin.dict.IDictionaryBuilder;
+import org.apache.kylin.engine.mr.HadoopUtil;
 import org.apache.kylin.engine.mr.KylinReducer;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
@@ -213,7 +214,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
 
     private void outputDistinctValues(TblColRef col, Collection<ByteArray> values, Context context) throws IOException {
         final Configuration conf = context.getConfiguration();
-        final FileSystem fs = FileSystem.get(conf);
+        final FileSystem fs = HadoopUtil.getWorkingFileSystem(conf);
         final String outputPath = conf.get(BatchConstants.CFG_OUTPUT_PATH);
         final Path colDir = new Path(outputPath, col.getIdentity());
         final String fileName = col.getIdentity() + "-" + taskId % uhcReducerCount;
@@ -269,7 +270,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
 
     private FSDataOutputStream getOutputStream(Context context, String outputFileName) throws IOException {
         final Configuration conf = context.getConfiguration();
-        final FileSystem fs = FileSystem.get(conf);
+        final FileSystem fs = HadoopUtil.getWorkingFileSystem(conf);
         final Path outputPath = new Path(conf.get(BatchConstants.CFG_OUTPUT_PATH));
         final Path outputFile = new Path(outputPath, outputFileName);
         if (!fs.exists(outputPath)) {
@@ -315,8 +316,9 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
 
     private void writeMapperAndCuboidStatistics(Context context) throws IOException {
         Configuration conf = context.getConfiguration();
-        FileSystem fs = FileSystem.get(conf);
-        FSDataOutputStream out = fs.create(new Path(statisticsOutput, BatchConstants.CFG_STATISTICS_CUBE_ESTIMATION_FILENAME));
+        FileSystem fs = HadoopUtil.getWorkingFileSystem(conf);
+        Path path = new Path(statisticsOutput, BatchConstants.CFG_STATISTICS_CUBE_ESTIMATION_FILENAME);
+        FSDataOutputStream out = fs.create(path);
 
         try {
             String msg;

http://git-wip-us.apache.org/repos/asf/kylin/blob/399f0c45/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
index 811fc24..54666d0 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
@@ -123,7 +123,7 @@ public class MergeStatisticsStep extends AbstractExecutable {
             averageSamplingPercentage = averageSamplingPercentage / CubingExecutableUtil.getMergingSegmentIds(this.getParams()).size();
             CubeStatsWriter.writeCuboidStatistics(conf, new Path(CubingExecutableUtil.getMergedStatisticsPath(this.getParams())), cuboidHLLMap, averageSamplingPercentage);
             Path statisticsFilePath = new Path(CubingExecutableUtil.getMergedStatisticsPath(this.getParams()), BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
-            FileSystem fs = statisticsFilePath.getFileSystem(conf);
+            FileSystem fs = HadoopUtil.getFileSystem(statisticsFilePath, conf);
             FSDataInputStream is = fs.open(statisticsFilePath);
             try {
                 // put the statistics to metadata store

http://git-wip-us.apache.org/repos/asf/kylin/blob/399f0c45/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java
index fca91a6..ee8da6b 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.kylin.engine.mr.HadoopUtil;
 import org.apache.kylin.engine.mr.KylinMapper;
 
 /**
@@ -89,7 +90,7 @@ public class RowKeyDistributionCheckerMapper extends KylinMapper<Text, Text, Tex
         List<byte[]> rowkeyList = new ArrayList<byte[]>();
         SequenceFile.Reader reader = null;
         try {
-            reader = new SequenceFile.Reader(path.getFileSystem(conf), path, conf);
+            reader = new SequenceFile.Reader(HadoopUtil.getFileSystem(path, conf), path, conf);
             Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
             Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
             while (reader.next(key, value)) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/399f0c45/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
index 020c62c..6120270 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
@@ -59,8 +59,8 @@ public class SaveStatisticsStep extends AbstractExecutable {
 
         ResourceStore rs = ResourceStore.getStore(kylinConf);
         try {
+            FileSystem fs = HadoopUtil.getWorkingFileSystem();
             Path statisticsFilePath = new Path(CubingExecutableUtil.getStatisticsPath(this.getParams()), BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
-            FileSystem fs = FileSystem.get(HadoopUtil.getCurrentConfiguration());
             if (!fs.exists(statisticsFilePath))
                 throw new IOException("File " + statisticsFilePath + " does not exists");
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/399f0c45/engine-mr/src/test/java/org/apache/kylin/engine/mr/SortedColumnReaderTest.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/SortedColumnReaderTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/SortedColumnReaderTest.java
index 3c4195f..be440f6 100644
--- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/SortedColumnReaderTest.java
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/SortedColumnReaderTest.java
@@ -48,10 +48,9 @@ public class SortedColumnReaderTest {
     @Test
     public void testReadStringMultiFile() throws Exception {
         String dirPath = "src/test/resources/multi_file_str";
-        StringBytesConverter converter = new StringBytesConverter();
         ArrayList<String> correctAnswer = readAllFiles(dirPath);
         Collections.sort(correctAnswer, new ByteComparator<String>(new StringBytesConverter()));
-        SortedColumnDFSFile column = new SortedColumnDFSFile(dirPath + "/", DataType.getType("varchar"));
+        SortedColumnDFSFile column = new SortedColumnDFSFile(qualify(dirPath + "/"), DataType.getType("varchar"));
         IDictionaryValueEnumerator e = new TableColumnValueEnumerator(column.getReader(), -1);
         ArrayList<String> output = new ArrayList<>();
         while (e.moveNext()) {
@@ -124,7 +123,7 @@ public class SortedColumnReaderTest {
                 }
             }
         });
-        SortedColumnDFSFile column = new SortedColumnDFSFile(dirPath + "/", DataType.getType("long"));
+        SortedColumnDFSFile column = new SortedColumnDFSFile(qualify(dirPath + "/"), DataType.getType("long"));
         IDictionaryValueEnumerator e = new TableColumnValueEnumerator(column.getReader(), -1);
         ArrayList<String> output = new ArrayList<>();
         while (e.moveNext()) {
@@ -142,7 +141,7 @@ public class SortedColumnReaderTest {
     public void testEmptyDir() throws Exception {
         String dirPath = "src/test/resources/empty_dir";
         new File(dirPath).mkdirs();
-        SortedColumnDFSFile column = new SortedColumnDFSFile(dirPath + "/", DataType.getType("varchar"));
+        SortedColumnDFSFile column = new SortedColumnDFSFile(qualify(dirPath + "/"), DataType.getType("varchar"));
         IDictionaryValueEnumerator e = new TableColumnValueEnumerator(column.getReader(), -1);
         ArrayList<String> output = new ArrayList<>();
         while (e.moveNext()) {
@@ -159,7 +158,7 @@ public class SortedColumnReaderTest {
         final BytesConverter<String> converter = new StringBytesConverter();
         Collections.sort(correctAnswer, new ByteComparator<String>(new StringBytesConverter()));
         System.out.println("correct answer:" + correctAnswer);
-        SortedColumnDFSFile column = new SortedColumnDFSFile(dirPath + "/", DataType.getType("varchar"));
+        SortedColumnDFSFile column = new SortedColumnDFSFile(qualify(dirPath + "/"), DataType.getType("varchar"));
         IDictionaryValueEnumerator e = new TableColumnValueEnumerator(column.getReader(), -1);
         ArrayList<String> output = new ArrayList<>();
         while (e.moveNext()) {
@@ -230,7 +229,7 @@ public class SortedColumnReaderTest {
                 }
             }
         });
-        SortedColumnDFSFile column = new SortedColumnDFSFile(dirPath + "/", DataType.getType("double"));
+        SortedColumnDFSFile column = new SortedColumnDFSFile(qualify(dirPath + "/"), DataType.getType("double"));
         IDictionaryValueEnumerator e = new TableColumnValueEnumerator(column.getReader(), -1);
         ArrayList<String> output = new ArrayList<>();
         while (e.moveNext()) {
@@ -300,4 +299,13 @@ public class SortedColumnReaderTest {
         }
         return result;
     }
+    
+    private String qualify(String path) {
+        String absolutePath = new File(path).getAbsolutePath();
+        if (absolutePath.startsWith("/"))
+            return "file://" + absolutePath;
+        else
+            return "file:///" + absolutePath;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/399f0c45/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
index 1c7c64d..62978db 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
@@ -120,8 +120,7 @@ public class BuildCubeWithEngine {
 
         try {
             //check hdfs permission
-            Configuration hconf = HadoopUtil.getCurrentConfiguration();
-            FileSystem fileSystem = FileSystem.get(hconf);
+            FileSystem fileSystem = HadoopUtil.getWorkingFileSystem();
             String hdfsWorkingDirectory = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory();
             Path coprocessorDir = new Path(hdfsWorkingDirectory);
             boolean success = fileSystem.mkdirs(coprocessorDir);

http://git-wip-us.apache.org/repos/asf/kylin/blob/399f0c45/kylin-it/src/test/java/org/apache/kylin/query/ITMassInQueryTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITMassInQueryTest.java b/kylin-it/src/test/java/org/apache/kylin/query/ITMassInQueryTest.java
index 5835baf..18d79ae 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/ITMassInQueryTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/ITMassInQueryTest.java
@@ -68,8 +68,7 @@ public class ITMassInQueryTest extends KylinTestBase {
         ITKylinQueryTest.joinType = "left";
         ITKylinQueryTest.setupAll();
 
-        Configuration hconf = HadoopUtil.getCurrentConfiguration();
-        fileSystem = FileSystem.get(hconf);
+        fileSystem = HadoopUtil.getWorkingFileSystem();
 
         int sellerCount = 200;
         Random r = new Random();

http://git-wip-us.apache.org/repos/asf/kylin/blob/399f0c45/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITHdfsOpsTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITHdfsOpsTest.java b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITHdfsOpsTest.java
index 499a456..786d7d1 100644
--- a/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITHdfsOpsTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITHdfsOpsTest.java
@@ -42,9 +42,7 @@ public class ITHdfsOpsTest extends HBaseMetadataTestCase {
 
         this.createTestMetadata();
 
-        Configuration hconf = HadoopUtil.getCurrentConfiguration();
-
-        fileSystem = FileSystem.get(hconf);
+        fileSystem = HadoopUtil.getWorkingFileSystem();
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kylin/blob/399f0c45/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index fc2b982..0f351b0 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -379,7 +379,7 @@ public class HiveMRInput implements IMRInput {
 
         private void rmdirOnHDFS(String path) throws IOException {
             Path externalDataPath = new Path(path);
-            FileSystem fs = FileSystem.get(externalDataPath.toUri(), HadoopUtil.getCurrentConfiguration());
+            FileSystem fs = HadoopUtil.getWorkingFileSystem();
             if (fs.exists(externalDataPath)) {
                 fs.delete(externalDataPath, true);
             }

http://git-wip-us.apache.org/repos/asf/kylin/blob/399f0c45/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
index dde3584..2f4fded 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
@@ -223,7 +223,7 @@ public class KafkaMRInput implements IMRInput {
 
         private void rmdirOnHDFS(String path) throws IOException {
             Path externalDataPath = new Path(path);
-            FileSystem fs = FileSystem.get(externalDataPath.toUri(), HadoopUtil.getCurrentConfiguration());
+            FileSystem fs = HadoopUtil.getWorkingFileSystem();
             if (fs.exists(externalDataPath)) {
                 fs.delete(externalDataPath, true);
             }

http://git-wip-us.apache.org/repos/asf/kylin/blob/399f0c45/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
index 1d19983..2a12984 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
@@ -27,7 +27,6 @@ import java.util.NavigableSet;
 import java.util.TreeSet;
 
 import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -49,6 +48,7 @@ import org.apache.kylin.common.persistence.RawResource;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.engine.mr.HadoopUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -201,8 +201,7 @@ public class HBaseResourceStore extends ResourceStore {
         byte[] value = r.getValue(B_FAMILY, B_COLUMN);
         if (value.length == 0) {
             Path redirectPath = bigCellHDFSPath(resPath);
-            Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration();
-            FileSystem fileSystem = FileSystem.get(hconf);
+            FileSystem fileSystem = HadoopUtil.getWorkingFileSystem();
 
             return fileSystem.open(redirectPath);
         } else {
@@ -292,8 +291,7 @@ public class HBaseResourceStore extends ResourceStore {
 
             if (hdfsResourceExist) { // remove hdfs cell value
                 Path redirectPath = bigCellHDFSPath(resPath);
-                Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration();
-                FileSystem fileSystem = FileSystem.get(hconf);
+                FileSystem fileSystem = HadoopUtil.getWorkingFileSystem();
 
                 if (fileSystem.exists(redirectPath)) {
                     fileSystem.delete(redirectPath, true);
@@ -339,8 +337,7 @@ public class HBaseResourceStore extends ResourceStore {
 
     private Path writeLargeCellToHdfs(String resPath, byte[] largeColumn, HTableInterface table) throws IOException {
         Path redirectPath = bigCellHDFSPath(resPath);
-        Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration();
-        FileSystem fileSystem = FileSystem.get(hconf);
+        FileSystem fileSystem = HadoopUtil.getWorkingFileSystem();
 
         if (fileSystem.exists(redirectPath)) {
             fileSystem.delete(redirectPath, true);

http://git-wip-us.apache.org/repos/asf/kylin/blob/399f0c45/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/DeprecatedGCStep.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/DeprecatedGCStep.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/DeprecatedGCStep.java
index 46a828e..fbe64d9 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/DeprecatedGCStep.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/DeprecatedGCStep.java
@@ -132,8 +132,7 @@ public class DeprecatedGCStep extends AbstractExecutable {
 
         List<String> oldHdfsPaths = this.getOldHdsfPaths();
         if (oldHdfsPaths != null && oldHdfsPaths.size() > 0) {
-            Configuration hconf = HadoopUtil.getCurrentConfiguration();
-            FileSystem fileSystem = FileSystem.get(hconf);
+            FileSystem fileSystem = HadoopUtil.getWorkingFileSystem();
             for (String path : oldHdfsPaths) {
                 if (path.endsWith("*"))
                     path = path.substring(0, path.length() - 1);

http://git-wip-us.apache.org/repos/asf/kylin/blob/399f0c45/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java
index fbfd582..89baf95 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java
@@ -59,7 +59,7 @@ public class HDFSPathGarbageCollectionStep extends AbstractExecutable {
         try {
             config = new JobEngineConfig(context.getConfig());
             List<String> toDeletePaths = getDeletePaths();
-            dropHdfsPathOnCluster(toDeletePaths, FileSystem.get(HadoopUtil.getCurrentConfiguration()));
+            dropHdfsPathOnCluster(toDeletePaths, HadoopUtil.getWorkingFileSystem());
 
             if (StringUtils.isNotEmpty(context.getConfig().getHBaseClusterFs())) {
                 dropHdfsPathOnCluster(toDeletePaths, FileSystem.get(HBaseConnection.getCurrentHBaseConfiguration()));

http://git-wip-us.apache.org/repos/asf/kylin/blob/399f0c45/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SequenceFileCuboidWriter.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SequenceFileCuboidWriter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SequenceFileCuboidWriter.java
index 8f2fc80..1d66d3e 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SequenceFileCuboidWriter.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SequenceFileCuboidWriter.java
@@ -55,7 +55,7 @@ public class SequenceFileCuboidWriter extends KVGTRecordWriter {
             JobBuilderSupport jobBuilderSupport = new JobBuilderSupport(cubeSegment, "SYSTEM");
             String cuboidRoot = jobBuilderSupport.getCuboidRootPath(cubeSegment);
             Path cuboidPath = new Path(cuboidRoot);
-            FileSystem fs = HadoopUtil.getFileSystem(cuboidRoot);
+            FileSystem fs = HadoopUtil.getWorkingFileSystem();
             try {
                 if (fs.exists(cuboidPath)) {
                     fs.delete(cuboidPath, true);

http://git-wip-us.apache.org/repos/asf/kylin/blob/399f0c45/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
index 2e682b1..31864f6 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
@@ -55,6 +55,7 @@ import org.apache.kylin.dict.DictionaryInfo;
 import org.apache.kylin.dict.DictionaryManager;
 import org.apache.kylin.dict.lookup.SnapshotManager;
 import org.apache.kylin.dict.lookup.SnapshotTable;
+import org.apache.kylin.engine.mr.HadoopUtil;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
 import org.apache.kylin.metadata.cachesync.Broadcaster;
 import org.apache.kylin.metadata.model.DataModelDesc;
@@ -136,7 +137,7 @@ public class CubeMigrationCLI {
         Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
         hbaseAdmin = new HBaseAdmin(conf);
 
-        hdfsFS = FileSystem.get(new Configuration());
+        hdfsFS = HadoopUtil.getWorkingFileSystem();
 
         operations = new ArrayList<Opt>();
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/399f0c45/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
index 9fe5a23..d1a74ad 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
@@ -50,6 +50,7 @@ import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.HadoopUtil;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.execution.AbstractExecutable;
@@ -180,7 +181,7 @@ public class StorageCleanupJob extends AbstractApplication {
         JobEngineConfig engineConfig = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
         CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
 
-        FileSystem fs = FileSystem.get(conf);
+        FileSystem fs = HadoopUtil.getWorkingFileSystem(conf);
         List<String> allHdfsPathsNeedToBeDeleted = new ArrayList<String>();
         // GlobFilter filter = new
         // GlobFilter(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory()

http://git-wip-us.apache.org/repos/asf/kylin/blob/399f0c45/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
----------------------------------------------------------------------
diff --git a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
index 40306c9..5269195 100644
--- a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
+++ b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
@@ -55,6 +55,7 @@ import org.apache.kylin.dict.DictionaryInfo;
 import org.apache.kylin.dict.DictionaryManager;
 import org.apache.kylin.dict.lookup.SnapshotManager;
 import org.apache.kylin.dict.lookup.SnapshotTable;
+import org.apache.kylin.engine.mr.HadoopUtil;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
 import org.apache.kylin.metadata.cachesync.Broadcaster;
 import org.apache.kylin.metadata.model.DataModelDesc;
@@ -134,7 +135,7 @@ public class CubeMigrationCLI {
         Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
         hbaseAdmin = new HBaseAdmin(conf);
 
-        hdfsFS = FileSystem.get(new Configuration());
+        hdfsFS = HadoopUtil.getWorkingFileSystem();
 
         operations = new ArrayList<Opt>();
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/399f0c45/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java
----------------------------------------------------------------------
diff --git a/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java b/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java
index 05e0142..b3e2ec0 100644
--- a/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java
+++ b/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java
@@ -185,7 +185,7 @@ public class StorageCleanupJob extends AbstractApplication {
         JobEngineConfig engineConfig = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
         CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
 
-        FileSystem fs = FileSystem.get(conf);
+        FileSystem fs = HadoopUtil.getWorkingFileSystem(conf);
         List<String> allHdfsPathsNeedToBeDeleted = new ArrayList<String>();
         // GlobFilter filter = new
         // GlobFilter(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory()
@@ -342,7 +342,7 @@ public class StorageCleanupJob extends AbstractApplication {
                     if (segmentId2JobId.containsKey(segmentId)) {
                         String path = JobBuilderSupport.getJobWorkingDir(engineConfig.getHdfsWorkingDirectory(), segmentId2JobId.get(segmentId)) + "/" + tableToDelete;
                         Path externalDataPath = new Path(path);
-                        FileSystem fs = FileSystem.get(externalDataPath.toUri(), HadoopUtil.getCurrentConfiguration());
+                        FileSystem fs = HadoopUtil.getWorkingFileSystem();
                         if (fs.exists(externalDataPath)) {
                             fs.delete(externalDataPath, true);
                             logger.info("Hive table {}'s external path {} deleted", tableToDelete, path);