You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/09/22 05:18:42 UTC
incubator-kylin git commit: KYLIN-957 Support HBase in a separate
cluster
Repository: incubator-kylin
Updated Branches:
refs/heads/2.x-staging d7c37e0a4 -> 8581df4a2
KYLIN-957 Support HBase in a separate cluster
Signed-off-by: shaofengshi <sh...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/8581df4a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/8581df4a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/8581df4a
Branch: refs/heads/2.x-staging
Commit: 8581df4a252c313e303d5bd6f93272a5e189ee4e
Parents: d7c37e0
Author: sunyerui <su...@gmail.com>
Authored: Wed Sep 16 00:03:29 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Tue Sep 22 11:17:10 2015 +0800
----------------------------------------------------------------------
build/conf/kylin.properties | 4 +++
.../org/apache/kylin/common/KylinConfig.java | 6 ++++
engine-mr/pom.xml | 5 +++
.../org/apache/kylin/engine/mr/HadoopUtil.java | 32 +++++++++++++++++++-
.../kylin/storage/hbase/HBaseConnection.java | 8 +++++
.../kylin/storage/hbase/HBaseResourceStore.java | 4 +--
.../kylin/storage/hbase/steps/BulkLoadJob.java | 3 +-
.../storage/hbase/steps/CreateHTableJob.java | 2 +-
.../kylin/storage/hbase/steps/HBaseMRSteps.java | 5 +--
.../hbase/util/DeployCoprocessorCLI.java | 2 +-
.../hbase/steps/ITHBaseResourceStoreTest.java | 2 +-
11 files changed, 64 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8581df4a/build/conf/kylin.properties
----------------------------------------------------------------------
diff --git a/build/conf/kylin.properties b/build/conf/kylin.properties
index 8dfb05b..5b56f31 100644
--- a/build/conf/kylin.properties
+++ b/build/conf/kylin.properties
@@ -12,6 +12,10 @@ kylin.storage.url=hbase
# Temp folder in hdfs, make sure user has the right access to the hdfs directory
kylin.hdfs.working.dir=/kylin
+# HBase Cluster FileSystem, which serving hbase, format as hdfs://hbase-cluster:8020
+# leave empty if hbase running on same cluster with hive and mapreduce
+kylin.hbase.cluster.fs=
+
kylin.job.mapreduce.default.reduce.input.mb=500
# If true, job engine will not assume that hadoop CLI reside on the same server as it self
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8581df4a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
index 43b8c4d..376327a 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -104,6 +104,8 @@ public class KylinConfig implements Serializable {
public static final String KYLIN_HDFS_WORKING_DIR = "kylin.hdfs.working.dir";
+ public static final String KYLIN_HBASE_CLUSTER_FS = "kylin.hbase.cluster.fs";
+
public static final String HIVE_DATABASE_FOR_INTERMEDIATE_TABLE = "kylin.job.hive.database.for.intermediatetable";
public static final String HIVE_PASSWORD = "hive.password";
@@ -293,6 +295,10 @@ public class KylinConfig implements Serializable {
return root + getMetadataUrlPrefix() + "/";
}
+ public String getHBaseClusterFs() {
+ return getOptional(KYLIN_HBASE_CLUSTER_FS, "");
+ }
+
public String getKylinJobLogDir() {
return getOptional(KYLIN_JOB_LOG_DIR, "/tmp/kylin/logs");
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8581df4a/engine-mr/pom.xml
----------------------------------------------------------------------
diff --git a/engine-mr/pom.xml b/engine-mr/pom.xml
index e00a693..7a2bfe5 100644
--- a/engine-mr/pom.xml
+++ b/engine-mr/pom.xml
@@ -102,6 +102,11 @@
<scope>provided</scope>
</dependency>
<dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.mrunit</groupId>
<artifactId>mrunit</artifactId>
<classifier>hadoop2</classifier>
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8581df4a/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java
index 1c00993..7fcbf1f 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java
@@ -24,16 +24,25 @@ import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.kylin.common.KylinConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class HadoopUtil {
+ private static final Logger logger = LoggerFactory.getLogger(HadoopUtil.class);
private static ThreadLocal<Configuration> hadoopConfig = new ThreadLocal<>();
+ private static ThreadLocal<Configuration> hbaseConfig = new ThreadLocal<>();
+
public static void setCurrentConfiguration(Configuration conf) {
hadoopConfig.set(conf);
}
@@ -45,6 +54,18 @@ public class HadoopUtil {
return hadoopConfig.get();
}
+ public static Configuration getCurrentHBaseConfiguration() {
+ if (hbaseConfig.get() == null) {
+ Configuration configuration = HBaseConfiguration.create(new Configuration());
+ String hbaseClusterFs = KylinConfig.getInstanceFromEnv().getHBaseClusterFs();
+ if (StringUtils.isNotEmpty(hbaseClusterFs)) {
+ configuration.set(FileSystem.FS_DEFAULT_NAME_KEY, hbaseClusterFs);
+ }
+ hbaseConfig.set(configuration);
+ }
+ return hbaseConfig.get();
+ }
+
public static FileSystem getFileSystem(String path) throws IOException {
return FileSystem.get(makeURI(path), getCurrentConfiguration());
}
@@ -57,6 +78,15 @@ public class HadoopUtil {
}
}
+ public static String makeQualifiedPathInHBaseCluster(String path) {
+ try {
+ FileSystem fs = FileSystem.get(getCurrentHBaseConfiguration());
+ return fs.makeQualified(new Path(path)).toString();
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Cannot create FileSystem from current hbase cluster conf", e);
+ }
+ }
+
public static String fixWindowsPath(String path) {
// fix windows path
if (path.startsWith("file://") && !path.startsWith("file:///") && path.contains(":\\")) {
@@ -87,7 +117,7 @@ public class HadoopUtil {
}
public static void deletePath(Configuration conf, Path path) throws IOException {
- FileSystem fs = FileSystem.get(conf);
+ FileSystem fs = FileSystem.get(path.toUri(), conf);
if (fs.exists(path)) {
fs.delete(path, true);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8581df4a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
index d1bb216..16bb30a 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
@@ -28,6 +28,7 @@ import java.util.regex.Pattern;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
@@ -36,6 +37,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.StorageException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -80,6 +82,12 @@ public class HBaseConnection {
conf.set(HConstants.HBASE_CLIENT_PAUSE, "3000");
conf.set(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "5");
conf.set(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, "60000");
+
+ String hbaseClusterFs = KylinConfig.getInstanceFromEnv().getHBaseClusterFs();
+ if (StringUtils.isNotEmpty(hbaseClusterFs)) {
+ conf.set(FileSystem.FS_DEFAULT_NAME_KEY, hbaseClusterFs);
+ }
+
// conf.set(ScannerCallable.LOG_SCANNER_ACTIVITY, "true");
if (StringUtils.isEmpty(url)) {
return conf;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8581df4a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
index 59f9e84..baec4b8 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
@@ -185,7 +185,7 @@ public class HBaseResourceStore extends ResourceStore {
byte[] value = r.getValue(B_FAMILY, B_COLUMN);
if (value.length == 0) {
Path redirectPath = bigCellHDFSPath(resPath);
- Configuration hconf = HadoopUtil.getCurrentConfiguration();
+ Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
FileSystem fileSystem = FileSystem.get(hconf);
return fileSystem.open(redirectPath);
@@ -304,7 +304,7 @@ public class HBaseResourceStore extends ResourceStore {
private Path writeLargeCellToHdfs(String resPath, byte[] largeColumn, HTableInterface table) throws IOException {
Path redirectPath = bigCellHDFSPath(resPath);
- Configuration hconf = HadoopUtil.getCurrentConfiguration();
+ Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
FileSystem fileSystem = FileSystem.get(hconf);
if (fileSystem.exists(redirectPath)) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8581df4a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/BulkLoadJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/BulkLoadJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/BulkLoadJob.java
index 2be61f4..cbddfae 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/BulkLoadJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/BulkLoadJob.java
@@ -31,6 +31,7 @@ import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
+import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,7 +61,7 @@ public class BulkLoadJob extends AbstractHadoopJob {
// end with "/"
String input = getOptionValue(OPTION_INPUT_PATH);
- Configuration conf = HBaseConfiguration.create(getConf());
+ Configuration conf = HadoopUtil.getCurrentHBaseConfiguration();
FileSystem fs = FileSystem.get(conf);
String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8581df4a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
index 35a35c1..eb1256c 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
@@ -103,7 +103,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
CubeSegment cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
String tableName = getOptionValue(OPTION_HTABLE_NAME).toUpperCase();
- Configuration conf = HBaseConfiguration.create(getConf());
+ Configuration conf = HadoopUtil.getCurrentHBaseConfiguration();
try {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8581df4a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
index 03b4361..dfb4f33 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
@@ -4,6 +4,7 @@ import java.util.List;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.engine.mr.JobBuilderSupport;
import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
import org.apache.kylin.engine.mr.common.MapReduceExecutable;
@@ -129,11 +130,11 @@ public class HBaseMRSteps extends JobBuilderSupport {
}
public String getHFilePath(String jobId) {
- return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/hfile/";
+ return HadoopUtil.makeQualifiedPathInHBaseCluster(getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/hfile/");
}
public String getRowkeyDistributionOutputPath(String jobId) {
- return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/rowkey_stats";
+ return HadoopUtil.makeQualifiedPathInHBaseCluster(getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/rowkey_stats");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8581df4a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
index 5c7d46e..21d7c38 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
@@ -108,7 +108,7 @@ public class DeployCoprocessorCLI {
private static void initHTableCoprocessor(HTableDescriptor desc) throws IOException {
KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
- Configuration hconf = HadoopUtil.getCurrentConfiguration();
+ Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
FileSystem fileSystem = FileSystem.get(hconf);
String localCoprocessorJar = kylinConfig.getCoprocessorLocalJar();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8581df4a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/ITHBaseResourceStoreTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/ITHBaseResourceStoreTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/ITHBaseResourceStoreTest.java
index e1976cb..ba95176 100644
--- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/ITHBaseResourceStoreTest.java
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/ITHBaseResourceStoreTest.java
@@ -80,7 +80,7 @@ public class ITHBaseResourceStoreTest extends HBaseMetadataTestCase {
assertEquals(content, t);
Path redirectPath = ((HBaseResourceStore) store).bigCellHDFSPath(path);
- Configuration hconf = HadoopUtil.getCurrentConfiguration();
+ Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
FileSystem fileSystem = FileSystem.get(hconf);
assertTrue(fileSystem.exists(redirectPath));