You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/09/22 05:18:42 UTC

incubator-kylin git commit: KYLIN-957 Support HBase in a separate cluster

Repository: incubator-kylin
Updated Branches:
  refs/heads/2.x-staging d7c37e0a4 -> 8581df4a2


KYLIN-957 Support HBase in a separate cluster

Signed-off-by: shaofengshi <sh...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/8581df4a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/8581df4a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/8581df4a

Branch: refs/heads/2.x-staging
Commit: 8581df4a252c313e303d5bd6f93272a5e189ee4e
Parents: d7c37e0
Author: sunyerui <su...@gmail.com>
Authored: Wed Sep 16 00:03:29 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Tue Sep 22 11:17:10 2015 +0800

----------------------------------------------------------------------
 build/conf/kylin.properties                     |  4 +++
 .../org/apache/kylin/common/KylinConfig.java    |  6 ++++
 engine-mr/pom.xml                               |  5 +++
 .../org/apache/kylin/engine/mr/HadoopUtil.java  | 32 +++++++++++++++++++-
 .../kylin/storage/hbase/HBaseConnection.java    |  8 +++++
 .../kylin/storage/hbase/HBaseResourceStore.java |  4 +--
 .../kylin/storage/hbase/steps/BulkLoadJob.java  |  3 +-
 .../storage/hbase/steps/CreateHTableJob.java    |  2 +-
 .../kylin/storage/hbase/steps/HBaseMRSteps.java |  5 +--
 .../hbase/util/DeployCoprocessorCLI.java        |  2 +-
 .../hbase/steps/ITHBaseResourceStoreTest.java   |  2 +-
 11 files changed, 64 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8581df4a/build/conf/kylin.properties
----------------------------------------------------------------------
diff --git a/build/conf/kylin.properties b/build/conf/kylin.properties
index 8dfb05b..5b56f31 100644
--- a/build/conf/kylin.properties
+++ b/build/conf/kylin.properties
@@ -12,6 +12,10 @@ kylin.storage.url=hbase
 # Temp folder in hdfs, make sure user has the right access to the hdfs directory
 kylin.hdfs.working.dir=/kylin
 
+# HBase Cluster FileSystem, which serving hbase, format as hdfs://hbase-cluster:8020
+# leave empty if hbase running on same cluster with hive and mapreduce
+kylin.hbase.cluster.fs=
+
 kylin.job.mapreduce.default.reduce.input.mb=500
 
 # If true, job engine will not assume that hadoop CLI reside on the same server as it self

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8581df4a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
index 43b8c4d..376327a 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -104,6 +104,8 @@ public class KylinConfig implements Serializable {
 
     public static final String KYLIN_HDFS_WORKING_DIR = "kylin.hdfs.working.dir";
 
+    public static final String KYLIN_HBASE_CLUSTER_FS = "kylin.hbase.cluster.fs";
+
     public static final String HIVE_DATABASE_FOR_INTERMEDIATE_TABLE = "kylin.job.hive.database.for.intermediatetable";
 
     public static final String HIVE_PASSWORD = "hive.password";
@@ -293,6 +295,10 @@ public class KylinConfig implements Serializable {
         return root + getMetadataUrlPrefix() + "/";
     }
 
+    public String getHBaseClusterFs() {
+        return getOptional(KYLIN_HBASE_CLUSTER_FS, "");
+    }
+
     public String getKylinJobLogDir() {
         return getOptional(KYLIN_JOB_LOG_DIR, "/tmp/kylin/logs");
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8581df4a/engine-mr/pom.xml
----------------------------------------------------------------------
diff --git a/engine-mr/pom.xml b/engine-mr/pom.xml
index e00a693..7a2bfe5 100644
--- a/engine-mr/pom.xml
+++ b/engine-mr/pom.xml
@@ -102,6 +102,11 @@
             <scope>provided</scope>
         </dependency>
         <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-common</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
             <groupId>org.apache.mrunit</groupId>
             <artifactId>mrunit</artifactId>
             <classifier>hadoop2</classifier>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8581df4a/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java
index 1c00993..7fcbf1f 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java
@@ -24,16 +24,25 @@ import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.kylin.common.KylinConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class HadoopUtil {
+    private static final Logger logger = LoggerFactory.getLogger(HadoopUtil.class);
 
     private static ThreadLocal<Configuration> hadoopConfig = new ThreadLocal<>();
 
+    private static ThreadLocal<Configuration> hbaseConfig = new ThreadLocal<>();
+
     public static void setCurrentConfiguration(Configuration conf) {
         hadoopConfig.set(conf);
     }
@@ -45,6 +54,18 @@ public class HadoopUtil {
         return hadoopConfig.get();
     }
 
+    public static Configuration getCurrentHBaseConfiguration() {
+        if (hbaseConfig.get() == null) {
+            Configuration configuration = HBaseConfiguration.create(new Configuration());
+            String hbaseClusterFs = KylinConfig.getInstanceFromEnv().getHBaseClusterFs();
+            if (StringUtils.isNotEmpty(hbaseClusterFs)) {
+                configuration.set(FileSystem.FS_DEFAULT_NAME_KEY, hbaseClusterFs);
+            }
+            hbaseConfig.set(configuration);
+        }
+        return hbaseConfig.get();
+    }
+
     public static FileSystem getFileSystem(String path) throws IOException {
         return FileSystem.get(makeURI(path), getCurrentConfiguration());
     }
@@ -57,6 +78,15 @@ public class HadoopUtil {
         }
     }
 
+    public static String makeQualifiedPathInHBaseCluster(String path) {
+        try {
+            FileSystem fs = FileSystem.get(getCurrentHBaseConfiguration());
+            return fs.makeQualified(new Path(path)).toString();
+        } catch (IOException e) {
+            throw new IllegalArgumentException("Cannot create FileSystem from current hbase cluster conf", e);
+        }
+    }
+
     public static String fixWindowsPath(String path) {
         // fix windows path
         if (path.startsWith("file://") && !path.startsWith("file:///") && path.contains(":\\")) {
@@ -87,7 +117,7 @@ public class HadoopUtil {
     }
 
     public static void deletePath(Configuration conf, Path path) throws IOException {
-        FileSystem fs = FileSystem.get(conf);
+        FileSystem fs = FileSystem.get(path.toUri(), conf);
         if (fs.exists(path)) {
             fs.delete(path, true);
         }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8581df4a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
index d1bb216..16bb30a 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
@@ -28,6 +28,7 @@ import java.util.regex.Pattern;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
@@ -36,6 +37,7 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.StorageException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -80,6 +82,12 @@ public class HBaseConnection {
         conf.set(HConstants.HBASE_CLIENT_PAUSE, "3000");
         conf.set(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "5");
         conf.set(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, "60000");
+
+        String hbaseClusterFs = KylinConfig.getInstanceFromEnv().getHBaseClusterFs();
+        if (StringUtils.isNotEmpty(hbaseClusterFs)) {
+            conf.set(FileSystem.FS_DEFAULT_NAME_KEY, hbaseClusterFs);
+        }
+
         // conf.set(ScannerCallable.LOG_SCANNER_ACTIVITY, "true");
         if (StringUtils.isEmpty(url)) {
             return conf;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8581df4a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
index 59f9e84..baec4b8 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
@@ -185,7 +185,7 @@ public class HBaseResourceStore extends ResourceStore {
         byte[] value = r.getValue(B_FAMILY, B_COLUMN);
         if (value.length == 0) {
             Path redirectPath = bigCellHDFSPath(resPath);
-            Configuration hconf = HadoopUtil.getCurrentConfiguration();
+            Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
             FileSystem fileSystem = FileSystem.get(hconf);
 
             return fileSystem.open(redirectPath);
@@ -304,7 +304,7 @@ public class HBaseResourceStore extends ResourceStore {
 
     private Path writeLargeCellToHdfs(String resPath, byte[] largeColumn, HTableInterface table) throws IOException {
         Path redirectPath = bigCellHDFSPath(resPath);
-        Configuration hconf = HadoopUtil.getCurrentConfiguration();
+        Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
         FileSystem fileSystem = FileSystem.get(hconf);
 
         if (fileSystem.exists(redirectPath)) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8581df4a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/BulkLoadJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/BulkLoadJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/BulkLoadJob.java
index 2be61f4..cbddfae 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/BulkLoadJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/BulkLoadJob.java
@@ -31,6 +31,7 @@ import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
+import org.apache.kylin.engine.mr.HadoopUtil;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,7 +61,7 @@ public class BulkLoadJob extends AbstractHadoopJob {
             // end with "/"
             String input = getOptionValue(OPTION_INPUT_PATH);
 
-            Configuration conf = HBaseConfiguration.create(getConf());
+            Configuration conf = HadoopUtil.getCurrentHBaseConfiguration();
             FileSystem fs = FileSystem.get(conf);
 
             String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8581df4a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
index 35a35c1..eb1256c 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
@@ -103,7 +103,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
         CubeSegment cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
 
         String tableName = getOptionValue(OPTION_HTABLE_NAME).toUpperCase();
-        Configuration conf = HBaseConfiguration.create(getConf());
+        Configuration conf = HadoopUtil.getCurrentHBaseConfiguration();
 
         try {
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8581df4a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
index 03b4361..dfb4f33 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
@@ -4,6 +4,7 @@ import java.util.List;
 
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.engine.mr.HadoopUtil;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
 import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
 import org.apache.kylin.engine.mr.common.MapReduceExecutable;
@@ -129,11 +130,11 @@ public class HBaseMRSteps extends JobBuilderSupport {
     }
 
     public String getHFilePath(String jobId) {
-        return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/hfile/";
+        return HadoopUtil.makeQualifiedPathInHBaseCluster(getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/hfile/");
     }
 
     public String getRowkeyDistributionOutputPath(String jobId) {
-        return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/rowkey_stats";
+        return HadoopUtil.makeQualifiedPathInHBaseCluster(getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/rowkey_stats");
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8581df4a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
index 5c7d46e..21d7c38 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
@@ -108,7 +108,7 @@ public class DeployCoprocessorCLI {
 
     private static void initHTableCoprocessor(HTableDescriptor desc) throws IOException {
         KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-        Configuration hconf = HadoopUtil.getCurrentConfiguration();
+        Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
         FileSystem fileSystem = FileSystem.get(hconf);
 
         String localCoprocessorJar = kylinConfig.getCoprocessorLocalJar();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8581df4a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/ITHBaseResourceStoreTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/ITHBaseResourceStoreTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/ITHBaseResourceStoreTest.java
index e1976cb..ba95176 100644
--- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/ITHBaseResourceStoreTest.java
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/ITHBaseResourceStoreTest.java
@@ -80,7 +80,7 @@ public class ITHBaseResourceStoreTest extends HBaseMetadataTestCase {
             assertEquals(content, t);
 
             Path redirectPath = ((HBaseResourceStore) store).bigCellHDFSPath(path);
-            Configuration hconf = HadoopUtil.getCurrentConfiguration();
+            Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
             FileSystem fileSystem = FileSystem.get(hconf);
             assertTrue(fileSystem.exists(redirectPath));