You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/09/19 01:46:41 UTC

[05/50] [abbrv] incubator-kylin git commit: KYLIN-957 Support HBase in a separate cluster

KYLIN-957 Support HBase in a separate cluster


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/2748643a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/2748643a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/2748643a

Branch: refs/heads/master
Commit: 2748643a945fc248c06bae301f89b08d0c79fe2d
Parents: 9b805e5
Author: sunyerui <su...@gmail.com>
Authored: Wed Aug 26 18:39:03 2015 +0800
Committer: Luke Han <lu...@apache.org>
Committed: Sun Sep 6 14:37:59 2015 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/common/KylinConfig.java    | 12 +++
 .../common/persistence/HBaseResourceStore.java  |  4 +-
 .../apache/kylin/common/util/HadoopUtil.java    | 49 +++++++++++-
 .../persistence/HBaseResourceStoreTest.java     |  2 +-
 .../kylin/common/util/HadoopUtilTest.java       | 83 ++++++++++++++++++++
 conf/kylin.properties                           |  8 ++
 .../apache/kylin/job/AbstractJobBuilder.java    |  8 +-
 .../apache/kylin/job/cube/CubingJobBuilder.java | 13 ++-
 .../kylin/job/cube/GarbageCollectionStep.java   | 36 +++++----
 .../kylin/job/hadoop/AbstractHadoopJob.java     |  2 +-
 .../kylin/job/hadoop/hbase/BulkLoadJob.java     |  3 +-
 .../kylin/job/hadoop/hbase/CreateHTableJob.java |  3 +-
 .../kylin/job/tools/DeployCoprocessorCLI.java   |  2 +-
 13 files changed, 198 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2748643a/common/src/main/java/org/apache/kylin/common/KylinConfig.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/KylinConfig.java b/common/src/main/java/org/apache/kylin/common/KylinConfig.java
index deb2eda..76031c2 100644
--- a/common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -94,6 +94,10 @@ public class KylinConfig {
 
     public static final String KYLIN_HDFS_WORKING_DIR = "kylin.hdfs.working.dir";
 
+    public static final String KYLIN_HADOOP_CLUSTER_FS = "kylin.hadoop.cluster.fs";
+
+    public static final String KYLIN_HBASE_CLUSTER_FS = "kylin.hbase.cluster.fs";
+
     public static final String HIVE_PASSWORD = "hive.password";
 
     public static final String HIVE_USER = "hive.user";
@@ -280,6 +284,14 @@ public class KylinConfig {
         return root + getMetadataUrlPrefix() + "/";
     }
 
+    public String getHadoopClusterFs() {
+        return getOptional(KYLIN_HADOOP_CLUSTER_FS, "");
+    }
+
+    public String getHBaseClusterFs() {
+        return getOptional(KYLIN_HBASE_CLUSTER_FS, "");
+    }
+
     public String getKylinJobLogDir() {
         return getOptional(KYLIN_JOB_LOG_DIR, "/tmp/kylin/logs");
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2748643a/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java b/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java
index e2a4b12..37b8f8d 100644
--- a/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java
+++ b/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java
@@ -179,7 +179,7 @@ public class HBaseResourceStore extends ResourceStore {
         byte[] value = r.getValue(B_FAMILY, B_COLUMN);
         if (value.length == 0) {
             Path redirectPath = bigCellHDFSPath(resPath);
-            Configuration hconf = HadoopUtil.getCurrentConfiguration();
+            Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
             FileSystem fileSystem = FileSystem.get(hconf);
 
             return fileSystem.open(redirectPath);
@@ -297,7 +297,7 @@ public class HBaseResourceStore extends ResourceStore {
 
     private Path writeLargeCellToHdfs(String resPath, byte[] largeColumn, HTableInterface table) throws IOException {
         Path redirectPath = bigCellHDFSPath(resPath);
-        Configuration hconf = HadoopUtil.getCurrentConfiguration();
+        Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
         FileSystem fileSystem = FileSystem.get(hconf);
 
         if (fileSystem.exists(redirectPath)) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2748643a/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java b/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
index fcefcf2..43b2f29 100644
--- a/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
+++ b/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
@@ -29,8 +29,10 @@ import java.util.regex.Pattern;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.kylin.common.KylinConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,17 +41,40 @@ public class HadoopUtil {
 
     private static ThreadLocal<Configuration> hadoopConfig = new ThreadLocal<>();
 
+    private static ThreadLocal<Configuration> hbaseConfig = new ThreadLocal<>();
+
     public static void setCurrentConfiguration(Configuration conf) {
         hadoopConfig.set(conf);
     }
 
+    public static void setCurrentHBaseConfiguration(Configuration conf) {
+        hbaseConfig.set(conf);
+    }
+
     public static Configuration getCurrentConfiguration() {
         if (hadoopConfig.get() == null) {
-            hadoopConfig.set(new Configuration());
+            Configuration configuration = new Configuration();
+            String hadoopClusterFs = KylinConfig.getInstanceFromEnv().getHadoopClusterFs();
+            if (hadoopClusterFs != null && !hadoopClusterFs.equals("")) {
+                configuration.set(FileSystem.FS_DEFAULT_NAME_KEY, hadoopClusterFs);
+            }
+            hadoopConfig.set(configuration);
         }
         return hadoopConfig.get();
     }
 
+    public static Configuration getCurrentHBaseConfiguration() {
+        if (hbaseConfig.get() == null) {
+            Configuration configuration = HBaseConfiguration.create(new Configuration());
+            String hbaseClusterFs = KylinConfig.getInstanceFromEnv().getHBaseClusterFs();
+            if (hbaseClusterFs != null && !hbaseClusterFs.equals("")) {
+                configuration.set(FileSystem.FS_DEFAULT_NAME_KEY, hbaseClusterFs);
+            }
+            hbaseConfig.set(configuration);
+        }
+        return hbaseConfig.get();
+    }
+
     public static FileSystem getFileSystem(String path) throws IOException {
         return FileSystem.get(makeURI(path), getCurrentConfiguration());
     }
@@ -62,6 +87,24 @@ public class HadoopUtil {
         }
     }
 
+    public static String makeQualifiedPathInHadoopCluster(String path) {
+        try {
+            FileSystem fs = FileSystem.get(getCurrentConfiguration());
+            return fs.makeQualified(new Path(path)).toString();
+        } catch (IOException e) {
+            throw new IllegalArgumentException("Cannot create FileSystem from current hadoop cluster conf", e);
+        }
+    }
+
+    public static String makeQualifiedPathInHBaseCluster(String path) {
+        try {
+            FileSystem fs = FileSystem.get(getCurrentHBaseConfiguration());
+            return fs.makeQualified(new Path(path)).toString();
+        } catch (IOException e) {
+            throw new IllegalArgumentException("Cannot create FileSystem from current hbase cluster conf", e);
+        }
+    }
+
     /**
      * e.g.
      * 0. hbase (recommended way)
@@ -116,6 +159,10 @@ public class HadoopUtil {
         conf.set(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, "60000");
         // conf.set(ScannerCallable.LOG_SCANNER_ACTIVITY, "true");
 
+        String hbaseClusterFs = KylinConfig.getInstanceFromEnv().getHBaseClusterFs();
+        if (hbaseClusterFs != null && !hbaseClusterFs.equals("")) {
+            conf.set(FileSystem.FS_DEFAULT_NAME_KEY, hbaseClusterFs);
+        }
         return conf;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2748643a/common/src/test/java/org/apache/kylin/common/persistence/HBaseResourceStoreTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/kylin/common/persistence/HBaseResourceStoreTest.java b/common/src/test/java/org/apache/kylin/common/persistence/HBaseResourceStoreTest.java
index c9e8063..75625fb 100644
--- a/common/src/test/java/org/apache/kylin/common/persistence/HBaseResourceStoreTest.java
+++ b/common/src/test/java/org/apache/kylin/common/persistence/HBaseResourceStoreTest.java
@@ -77,7 +77,7 @@ public class HBaseResourceStoreTest extends HBaseMetadataTestCase {
             assertEquals(content, t);
 
             Path redirectPath = ((HBaseResourceStore) store).bigCellHDFSPath(path);
-            Configuration hconf = HadoopUtil.getCurrentConfiguration();
+            Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
             FileSystem fileSystem = FileSystem.get(hconf);
             assertTrue(fileSystem.exists(redirectPath));
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2748643a/common/src/test/java/org/apache/kylin/common/util/HadoopUtilTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/kylin/common/util/HadoopUtilTest.java b/common/src/test/java/org/apache/kylin/common/util/HadoopUtilTest.java
new file mode 100644
index 0000000..c380933
--- /dev/null
+++ b/common/src/test/java/org/apache/kylin/common/util/HadoopUtilTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.kylin.common.KylinConfig;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+import static org.junit.Assert.*;
+
+/**
+ * Created by sunyerui on 15/8/26.
+ * Tests for HadoopUtil
+ */
+public class HadoopUtilTest {
+
+  @BeforeClass
+  public static void beforeClass() {
+    System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/sandbox");
+  }
+
+  @After
+  public void after() {
+    HadoopUtil.setCurrentConfiguration(null);
+    HadoopUtil.setCurrentHBaseConfiguration(null);
+  }
+
+  @Test
+  public void testGetCurrentConfiguration() throws Exception {
+    KylinConfig config = KylinConfig.getInstanceFromEnv();
+    config.setProperty(KylinConfig.KYLIN_HADOOP_CLUSTER_FS, "hdfs://hadoop-cluster/");
+
+    Configuration conf = HadoopUtil.getCurrentConfiguration();
+    assertEquals("hdfs://hadoop-cluster/", conf.get(FileSystem.FS_DEFAULT_NAME_KEY));
+  }
+
+  @Test
+  public void testGetCurrentHBaseConfiguration() throws Exception {
+    KylinConfig config = KylinConfig.getInstanceFromEnv();
+    config.setProperty(KylinConfig.KYLIN_HBASE_CLUSTER_FS, "hdfs://hbase-cluster/");
+
+    Configuration conf = HadoopUtil.getCurrentHBaseConfiguration();
+    assertEquals("hdfs://hbase-cluster/", conf.get(FileSystem.FS_DEFAULT_NAME_KEY));
+  }
+
+  @Test
+  public void testMakeQualifiedPathInHadoopCluster() throws Exception {
+    KylinConfig config = KylinConfig.getInstanceFromEnv();
+    config.setProperty(KylinConfig.KYLIN_HADOOP_CLUSTER_FS, "file:/");
+
+    String path = HadoopUtil.makeQualifiedPathInHadoopCluster("/path/to/test/hadoop");
+    assertEquals("file:/path/to/test/hadoop", path);
+  }
+
+  @Test
+  public void testMakeQualifiedPathInHBaseCluster() throws Exception {
+    KylinConfig config = KylinConfig.getInstanceFromEnv();
+    config.setProperty(KylinConfig.KYLIN_HBASE_CLUSTER_FS, "file:/");
+
+    String path = HadoopUtil.makeQualifiedPathInHBaseCluster("/path/to/test/hbase");
+    assertEquals("file:/path/to/test/hbase", path);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2748643a/conf/kylin.properties
----------------------------------------------------------------------
diff --git a/conf/kylin.properties b/conf/kylin.properties
index 8c7c647..af860bd 100644
--- a/conf/kylin.properties
+++ b/conf/kylin.properties
@@ -29,6 +29,14 @@ kylin.storage.url=hbase
 # Temp folder in hdfs, make sure user has the right access to the hdfs directory
 kylin.hdfs.working.dir=/kylin
 
+# Hadoop Cluster FileSystem, which serving hive and mapreduce, format as hdfs://hadoop-cluster/
+# leave empty if using default fs configured by local core-site.xml
+kylin.hadoop.cluster.fs=
+
+# HBase Cluster FileSystem, which serving hbase, format as hdfs://hbase-cluster/
+# leave empty if hbase running on same cluster with hive and mapreduce
+kylin.hbase.cluster.fs=
+
 kylin.job.mapreduce.default.reduce.input.mb=500
 
 kylin.server.mode=all

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2748643a/job/src/main/java/org/apache/kylin/job/AbstractJobBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/AbstractJobBuilder.java b/job/src/main/java/org/apache/kylin/job/AbstractJobBuilder.java
index e6fde23..96b87c5 100644
--- a/job/src/main/java/org/apache/kylin/job/AbstractJobBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/AbstractJobBuilder.java
@@ -20,6 +20,9 @@ package org.apache.kylin.job;
 
 import java.io.IOException;
 
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.job.common.ShellExecutable;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.engine.JobEngineConfig;
@@ -62,6 +65,7 @@ public abstract class AbstractJobBuilder {
     protected AbstractExecutable createIntermediateHiveTableStep(IJoinedFlatTableDesc intermediateTableDesc, String jobId) {
 
         final String useDatabaseHql = "USE " + engineConfig.getConfig().getHiveDatabaseForIntermediateTable() + ";";
+        final String setClusterHql = "-hiveconf " + FileSystem.FS_DEFAULT_NAME_KEY + "=\"" + HadoopUtil.getCurrentConfiguration().get(FileSystem.FS_DEFAULT_NAME_KEY) + "\"";
         final String dropTableHql = JoinedFlatTable.generateDropTableStatement(intermediateTableDesc, jobId);
         final String createTableHql = JoinedFlatTable.generateCreateTableStatement(intermediateTableDesc, getJobWorkingDir(jobId), jobId);
         String insertDataHqls;
@@ -74,7 +78,9 @@ public abstract class AbstractJobBuilder {
 
         ShellExecutable step = new ShellExecutable();
         StringBuffer buf = new StringBuffer();
-        buf.append("hive -e \"");
+        buf.append("hive ");
+        buf.append(setClusterHql);
+        buf.append(" -e \"");
         buf.append(useDatabaseHql + "\n");
         buf.append(dropTableHql + "\n");
         buf.append(createTableHql + "\n");

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2748643a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
index 747ae3c..dd71cd8 100644
--- a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
@@ -25,6 +25,9 @@ import java.util.List;
 import java.util.TimeZone;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.job.AbstractJobBuilder;
@@ -201,7 +204,7 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
 
     AbstractExecutable addHTableSteps(CubeSegment seg, String cuboidRootPath, CubingJob result) {
         final String jobId = result.getId();
-        final String cuboidPath = cuboidRootPath + "*";
+        final String cuboidPath = HadoopUtil.makeQualifiedPathInHadoopCluster(cuboidRootPath + "*");
 
         result.addTask(createRangeRowkeyDistributionStep(seg, cuboidPath));
         // create htable step
@@ -240,6 +243,8 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
             if (jobConf != null && jobConf.length() > 0) {
                 builder.append(" -conf ").append(jobConf);
             }
+            String setCluster = " -D" + FileSystem.FS_DEFAULT_NAME_KEY + "=" + HadoopUtil.getCurrentConfiguration().get(FileSystem.FS_DEFAULT_NAME_KEY);
+            builder.append(setCluster);
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
@@ -263,15 +268,15 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
     }
 
     private String getRowkeyDistributionOutputPath(CubeSegment seg) {
-        return getJobWorkingDir(seg.getLastBuildJobID()) + "/" + seg.getCubeInstance().getName() + "/rowkey_stats";
+        return HadoopUtil.makeQualifiedPathInHBaseCluster(getJobWorkingDir(seg.getLastBuildJobID()) + "/" + seg.getCubeInstance().getName() + "/rowkey_stats");
     }
 
     private String getFactDistinctColumnsPath(CubeSegment seg, String jobUuid) {
-        return getJobWorkingDir(jobUuid) + "/" + seg.getCubeInstance().getName() + "/fact_distinct_columns";
+        return HadoopUtil.makeQualifiedPathInHadoopCluster(getJobWorkingDir(jobUuid) + "/" + seg.getCubeInstance().getName() + "/fact_distinct_columns");
     }
 
     private String getHFilePath(CubeSegment seg, String jobId) {
-        return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/hfile/";
+        return HadoopUtil.makeQualifiedPathInHBaseCluster(getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/hfile/");
     }
 
     private MapReduceExecutable createFactDistinctColumnsStep(CubeSegment seg, String intermediateHiveTableName, String jobId) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2748643a/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java b/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java
index 4cb4a80..b4f6e8e 100644
--- a/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java
+++ b/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java
@@ -83,10 +83,14 @@ public class GarbageCollectionStep extends AbstractExecutable {
     private void dropHiveTable(ExecutableContext context) throws IOException {
         final String hiveTable = this.getOldHiveTable();
         if (StringUtils.isNotEmpty(hiveTable)) {
-            final String dropSQL = "DROP TABLE IF EXISTS  " + hiveTable + ";";
-            final String dropHiveCMD = "hive -e \"" + dropSQL + "\"";
+            final String dropSQL = "USE " + KylinConfig.getInstanceFromEnv().getHiveDatabaseForIntermediateTable() + ";" + " DROP TABLE IF EXISTS  " + hiveTable + ";";
+            final String setClusterHql = "-hiveconf " + FileSystem.FS_DEFAULT_NAME_KEY + "=\"" + HadoopUtil.getCurrentConfiguration().get(FileSystem.FS_DEFAULT_NAME_KEY) + "\"";
+            final String dropHiveCMD = "hive " + setClusterHql + " -e \"" + dropSQL + "\"";
+            logger.info("executing: " + dropHiveCMD);
             ShellCmdOutput shellCmdOutput = new ShellCmdOutput();
             context.getConfig().getCliCommandExecutor().execute(dropHiveCMD, shellCmdOutput);
+            logger.debug("Dropped Hive table " + hiveTable + " \n");
+            output.append(shellCmdOutput.getOutput() + " \n");
             output.append("Dropped Hive table " + hiveTable + " \n");
         }
 
@@ -129,27 +133,31 @@ public class GarbageCollectionStep extends AbstractExecutable {
         }
     }
 
-    private void dropHdfsPath(ExecutableContext context) throws IOException {
+    private void dropFileSystemPath(FileSystem fs, Path p) throws IOException {
+        Path path = fs.makeQualified(p);
+        if (fs.exists(path)) {
+            fs.delete(path, true);
+            logger.debug("Dropped HDFS path: " + path);
+            output.append("Dropped HDFS path  \"" + path + "\" \n");
+        } else {
+            logger.debug("HDFS path not exists: " + path);
+            output.append("HDFS path not exists: \"" + path + "\" \n");
+        }
+    }
 
+    private void dropHdfsPath(ExecutableContext context) throws IOException {
         List<String> oldHdfsPaths = this.getOldHdsfPaths();
         if (oldHdfsPaths != null && oldHdfsPaths.size() > 0) {
-            Configuration hconf = HadoopUtil.getCurrentConfiguration();
-            FileSystem fileSystem = FileSystem.get(hconf);
+            FileSystem hadoopFs = FileSystem.get(HadoopUtil.getCurrentConfiguration());
+            FileSystem hbaseFs = FileSystem.get(HadoopUtil.getCurrentHBaseConfiguration());
             for (String path : oldHdfsPaths) {
                 if (path.endsWith("*"))
                     path = path.substring(0, path.length() - 1);
 
                 Path oldPath = new Path(path);
-                if (fileSystem.exists(oldPath)) {
-                    fileSystem.delete(oldPath, true);
-                    logger.debug("Dropped HDFS path: " + path);
-                    output.append("Dropped HDFS path  \"" + path + "\" \n");
-                } else {
-                    logger.debug("HDFS path not exists: " + path);
-                    output.append("HDFS path not exists: \"" + path + "\" \n");
-                }
+                dropFileSystemPath(hadoopFs, oldPath);
+                dropFileSystemPath(hbaseFs, oldPath);
             }
-
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2748643a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
index 6ad89d6..a995649 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
@@ -295,7 +295,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
     }
 
     protected void deletePath(Configuration conf, Path path) throws IOException {
-        FileSystem fs = FileSystem.get(conf);
+        FileSystem fs = FileSystem.get(path.toUri(), conf);
         if (fs.exists(path)) {
             fs.delete(path, true);
         }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2748643a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/BulkLoadJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/BulkLoadJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/hbase/BulkLoadJob.java
index 2608085..692d53e 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/BulkLoadJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/hbase/BulkLoadJob.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.model.CubeDesc;
@@ -59,7 +60,7 @@ public class BulkLoadJob extends AbstractHadoopJob {
             // end with "/"
             String input = getOptionValue(OPTION_INPUT_PATH);
 
-            Configuration conf = HBaseConfiguration.create(getConf());
+            Configuration conf = HadoopUtil.getCurrentHBaseConfiguration();
             FileSystem fs = FileSystem.get(conf);
 
             String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2748643a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java
index f114b5b..027c0ca 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.model.CubeDesc;
@@ -79,7 +80,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
         tableDesc.setValue(HTableDescriptor.SPLIT_POLICY, ConstantSizeRegionSplitPolicy.class.getName());
         tableDesc.setValue(IRealizationConstants.HTableTag, config.getMetadataUrlPrefix());
 
-        Configuration conf = HBaseConfiguration.create(getConf());
+        Configuration conf = HadoopUtil.getCurrentHBaseConfiguration();
         HBaseAdmin admin = new HBaseAdmin(conf);
 
         try {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2748643a/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java b/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java
index a28477e..89472b2 100644
--- a/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java
+++ b/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java
@@ -106,7 +106,7 @@ public class DeployCoprocessorCLI {
 
     private static void initHTableCoprocessor(HTableDescriptor desc) throws IOException {
         KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-        Configuration hconf = HadoopUtil.getCurrentConfiguration();
+        Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
         FileSystem fileSystem = FileSystem.get(hconf);
 
         String localCoprocessorJar = kylinConfig.getCoprocessorLocalJar();