You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/09/19 01:46:41 UTC
[05/50] [abbrv] incubator-kylin git commit: KYLIN-957 Support HBase
in a separate cluster
KYLIN-957 Support HBase in a separate cluster
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/2748643a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/2748643a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/2748643a
Branch: refs/heads/master
Commit: 2748643a945fc248c06bae301f89b08d0c79fe2d
Parents: 9b805e5
Author: sunyerui <su...@gmail.com>
Authored: Wed Aug 26 18:39:03 2015 +0800
Committer: Luke Han <lu...@apache.org>
Committed: Sun Sep 6 14:37:59 2015 +0800
----------------------------------------------------------------------
.../org/apache/kylin/common/KylinConfig.java | 12 +++
.../common/persistence/HBaseResourceStore.java | 4 +-
.../apache/kylin/common/util/HadoopUtil.java | 49 +++++++++++-
.../persistence/HBaseResourceStoreTest.java | 2 +-
.../kylin/common/util/HadoopUtilTest.java | 83 ++++++++++++++++++++
conf/kylin.properties | 8 ++
.../apache/kylin/job/AbstractJobBuilder.java | 8 +-
.../apache/kylin/job/cube/CubingJobBuilder.java | 13 ++-
.../kylin/job/cube/GarbageCollectionStep.java | 36 +++++----
.../kylin/job/hadoop/AbstractHadoopJob.java | 2 +-
.../kylin/job/hadoop/hbase/BulkLoadJob.java | 3 +-
.../kylin/job/hadoop/hbase/CreateHTableJob.java | 3 +-
.../kylin/job/tools/DeployCoprocessorCLI.java | 2 +-
13 files changed, 198 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2748643a/common/src/main/java/org/apache/kylin/common/KylinConfig.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/KylinConfig.java b/common/src/main/java/org/apache/kylin/common/KylinConfig.java
index deb2eda..76031c2 100644
--- a/common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -94,6 +94,10 @@ public class KylinConfig {
public static final String KYLIN_HDFS_WORKING_DIR = "kylin.hdfs.working.dir";
+ public static final String KYLIN_HADOOP_CLUSTER_FS = "kylin.hadoop.cluster.fs";
+
+ public static final String KYLIN_HBASE_CLUSTER_FS = "kylin.hbase.cluster.fs";
+
public static final String HIVE_PASSWORD = "hive.password";
public static final String HIVE_USER = "hive.user";
@@ -280,6 +284,14 @@ public class KylinConfig {
return root + getMetadataUrlPrefix() + "/";
}
+ public String getHadoopClusterFs() {
+ return getOptional(KYLIN_HADOOP_CLUSTER_FS, "");
+ }
+
+ public String getHBaseClusterFs() {
+ return getOptional(KYLIN_HBASE_CLUSTER_FS, "");
+ }
+
public String getKylinJobLogDir() {
return getOptional(KYLIN_JOB_LOG_DIR, "/tmp/kylin/logs");
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2748643a/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java b/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java
index e2a4b12..37b8f8d 100644
--- a/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java
+++ b/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java
@@ -179,7 +179,7 @@ public class HBaseResourceStore extends ResourceStore {
byte[] value = r.getValue(B_FAMILY, B_COLUMN);
if (value.length == 0) {
Path redirectPath = bigCellHDFSPath(resPath);
- Configuration hconf = HadoopUtil.getCurrentConfiguration();
+ Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
FileSystem fileSystem = FileSystem.get(hconf);
return fileSystem.open(redirectPath);
@@ -297,7 +297,7 @@ public class HBaseResourceStore extends ResourceStore {
private Path writeLargeCellToHdfs(String resPath, byte[] largeColumn, HTableInterface table) throws IOException {
Path redirectPath = bigCellHDFSPath(resPath);
- Configuration hconf = HadoopUtil.getCurrentConfiguration();
+ Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
FileSystem fileSystem = FileSystem.get(hconf);
if (fileSystem.exists(redirectPath)) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2748643a/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java b/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
index fcefcf2..43b2f29 100644
--- a/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
+++ b/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
@@ -29,8 +29,10 @@ import java.util.regex.Pattern;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.kylin.common.KylinConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,17 +41,40 @@ public class HadoopUtil {
private static ThreadLocal<Configuration> hadoopConfig = new ThreadLocal<>();
+ private static ThreadLocal<Configuration> hbaseConfig = new ThreadLocal<>();
+
public static void setCurrentConfiguration(Configuration conf) {
hadoopConfig.set(conf);
}
+ public static void setCurrentHBaseConfiguration(Configuration conf) {
+ hbaseConfig.set(conf);
+ }
+
public static Configuration getCurrentConfiguration() {
if (hadoopConfig.get() == null) {
- hadoopConfig.set(new Configuration());
+ Configuration configuration = new Configuration();
+ String hadoopClusterFs = KylinConfig.getInstanceFromEnv().getHadoopClusterFs();
+ if (hadoopClusterFs != null && !hadoopClusterFs.equals("")) {
+ configuration.set(FileSystem.FS_DEFAULT_NAME_KEY, hadoopClusterFs);
+ }
+ hadoopConfig.set(configuration);
}
return hadoopConfig.get();
}
+ public static Configuration getCurrentHBaseConfiguration() {
+ if (hbaseConfig.get() == null) {
+ Configuration configuration = HBaseConfiguration.create(new Configuration());
+ String hbaseClusterFs = KylinConfig.getInstanceFromEnv().getHBaseClusterFs();
+ if (hbaseClusterFs != null && !hbaseClusterFs.equals("")) {
+ configuration.set(FileSystem.FS_DEFAULT_NAME_KEY, hbaseClusterFs);
+ }
+ hbaseConfig.set(configuration);
+ }
+ return hbaseConfig.get();
+ }
+
public static FileSystem getFileSystem(String path) throws IOException {
return FileSystem.get(makeURI(path), getCurrentConfiguration());
}
@@ -62,6 +87,24 @@ public class HadoopUtil {
}
}
+ public static String makeQualifiedPathInHadoopCluster(String path) {
+ try {
+ FileSystem fs = FileSystem.get(getCurrentConfiguration());
+ return fs.makeQualified(new Path(path)).toString();
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Cannot create FileSystem from current hadoop cluster conf", e);
+ }
+ }
+
+ public static String makeQualifiedPathInHBaseCluster(String path) {
+ try {
+ FileSystem fs = FileSystem.get(getCurrentHBaseConfiguration());
+ return fs.makeQualified(new Path(path)).toString();
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Cannot create FileSystem from current hbase cluster conf", e);
+ }
+ }
+
/**
* e.g.
* 0. hbase (recommended way)
@@ -116,6 +159,10 @@ public class HadoopUtil {
conf.set(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, "60000");
// conf.set(ScannerCallable.LOG_SCANNER_ACTIVITY, "true");
+ String hbaseClusterFs = KylinConfig.getInstanceFromEnv().getHBaseClusterFs();
+ if (hbaseClusterFs != null && !hbaseClusterFs.equals("")) {
+ conf.set(FileSystem.FS_DEFAULT_NAME_KEY, hbaseClusterFs);
+ }
return conf;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2748643a/common/src/test/java/org/apache/kylin/common/persistence/HBaseResourceStoreTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/kylin/common/persistence/HBaseResourceStoreTest.java b/common/src/test/java/org/apache/kylin/common/persistence/HBaseResourceStoreTest.java
index c9e8063..75625fb 100644
--- a/common/src/test/java/org/apache/kylin/common/persistence/HBaseResourceStoreTest.java
+++ b/common/src/test/java/org/apache/kylin/common/persistence/HBaseResourceStoreTest.java
@@ -77,7 +77,7 @@ public class HBaseResourceStoreTest extends HBaseMetadataTestCase {
assertEquals(content, t);
Path redirectPath = ((HBaseResourceStore) store).bigCellHDFSPath(path);
- Configuration hconf = HadoopUtil.getCurrentConfiguration();
+ Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
FileSystem fileSystem = FileSystem.get(hconf);
assertTrue(fileSystem.exists(redirectPath));
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2748643a/common/src/test/java/org/apache/kylin/common/util/HadoopUtilTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/kylin/common/util/HadoopUtilTest.java b/common/src/test/java/org/apache/kylin/common/util/HadoopUtilTest.java
new file mode 100644
index 0000000..c380933
--- /dev/null
+++ b/common/src/test/java/org/apache/kylin/common/util/HadoopUtilTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.kylin.common.KylinConfig;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+import static org.junit.Assert.*;
+
+/**
+ * Created by sunyerui on 15/8/26.
+ * Tests for HadoopUtil
+ */
+public class HadoopUtilTest {
+
+ @BeforeClass
+ public static void beforeClass() {
+ System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/sandbox");
+ }
+
+ @After
+ public void after() {
+ HadoopUtil.setCurrentConfiguration(null);
+ HadoopUtil.setCurrentHBaseConfiguration(null);
+ }
+
+ @Test
+ public void testGetCurrentConfiguration() throws Exception {
+ KylinConfig config = KylinConfig.getInstanceFromEnv();
+ config.setProperty(KylinConfig.KYLIN_HADOOP_CLUSTER_FS, "hdfs://hadoop-cluster/");
+
+ Configuration conf = HadoopUtil.getCurrentConfiguration();
+ assertEquals("hdfs://hadoop-cluster/", conf.get(FileSystem.FS_DEFAULT_NAME_KEY));
+ }
+
+ @Test
+ public void testGetCurrentHBaseConfiguration() throws Exception {
+ KylinConfig config = KylinConfig.getInstanceFromEnv();
+ config.setProperty(KylinConfig.KYLIN_HBASE_CLUSTER_FS, "hdfs://hbase-cluster/");
+
+ Configuration conf = HadoopUtil.getCurrentHBaseConfiguration();
+ assertEquals("hdfs://hbase-cluster/", conf.get(FileSystem.FS_DEFAULT_NAME_KEY));
+ }
+
+ @Test
+ public void testMakeQualifiedPathInHadoopCluster() throws Exception {
+ KylinConfig config = KylinConfig.getInstanceFromEnv();
+ config.setProperty(KylinConfig.KYLIN_HADOOP_CLUSTER_FS, "file:/");
+
+ String path = HadoopUtil.makeQualifiedPathInHadoopCluster("/path/to/test/hadoop");
+ assertEquals("file:/path/to/test/hadoop", path);
+ }
+
+ @Test
+ public void testMakeQualifiedPathInHBaseCluster() throws Exception {
+ KylinConfig config = KylinConfig.getInstanceFromEnv();
+ config.setProperty(KylinConfig.KYLIN_HBASE_CLUSTER_FS, "file:/");
+
+ String path = HadoopUtil.makeQualifiedPathInHBaseCluster("/path/to/test/hbase");
+ assertEquals("file:/path/to/test/hbase", path);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2748643a/conf/kylin.properties
----------------------------------------------------------------------
diff --git a/conf/kylin.properties b/conf/kylin.properties
index 8c7c647..af860bd 100644
--- a/conf/kylin.properties
+++ b/conf/kylin.properties
@@ -29,6 +29,14 @@ kylin.storage.url=hbase
# Temp folder in hdfs, make sure user has the right access to the hdfs directory
kylin.hdfs.working.dir=/kylin
+# Hadoop Cluster FileSystem, which serving hive and mapreduce, format as hdfs://hadoop-cluster/
+# leave empty if using default fs configured by local core-site.xml
+kylin.hadoop.cluster.fs=
+
+# HBase Cluster FileSystem, which serving hbase, format as hdfs://hbase-cluster/
+# leave empty if hbase running on same cluster with hive and mapreduce
+kylin.hbase.cluster.fs=
+
kylin.job.mapreduce.default.reduce.input.mb=500
kylin.server.mode=all
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2748643a/job/src/main/java/org/apache/kylin/job/AbstractJobBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/AbstractJobBuilder.java b/job/src/main/java/org/apache/kylin/job/AbstractJobBuilder.java
index e6fde23..96b87c5 100644
--- a/job/src/main/java/org/apache/kylin/job/AbstractJobBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/AbstractJobBuilder.java
@@ -20,6 +20,9 @@ package org.apache.kylin.job;
import java.io.IOException;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.job.common.ShellExecutable;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.engine.JobEngineConfig;
@@ -62,6 +65,7 @@ public abstract class AbstractJobBuilder {
protected AbstractExecutable createIntermediateHiveTableStep(IJoinedFlatTableDesc intermediateTableDesc, String jobId) {
final String useDatabaseHql = "USE " + engineConfig.getConfig().getHiveDatabaseForIntermediateTable() + ";";
+ final String setClusterHql = "-hiveconf " + FileSystem.FS_DEFAULT_NAME_KEY + "=\"" + HadoopUtil.getCurrentConfiguration().get(FileSystem.FS_DEFAULT_NAME_KEY) + "\"";
final String dropTableHql = JoinedFlatTable.generateDropTableStatement(intermediateTableDesc, jobId);
final String createTableHql = JoinedFlatTable.generateCreateTableStatement(intermediateTableDesc, getJobWorkingDir(jobId), jobId);
String insertDataHqls;
@@ -74,7 +78,9 @@ public abstract class AbstractJobBuilder {
ShellExecutable step = new ShellExecutable();
StringBuffer buf = new StringBuffer();
- buf.append("hive -e \"");
+ buf.append("hive ");
+ buf.append(setClusterHql);
+ buf.append(" -e \"");
buf.append(useDatabaseHql + "\n");
buf.append(dropTableHql + "\n");
buf.append(createTableHql + "\n");
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2748643a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
index 747ae3c..dd71cd8 100644
--- a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
@@ -25,6 +25,9 @@ import java.util.List;
import java.util.TimeZone;
import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.job.AbstractJobBuilder;
@@ -201,7 +204,7 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
AbstractExecutable addHTableSteps(CubeSegment seg, String cuboidRootPath, CubingJob result) {
final String jobId = result.getId();
- final String cuboidPath = cuboidRootPath + "*";
+ final String cuboidPath = HadoopUtil.makeQualifiedPathInHadoopCluster(cuboidRootPath + "*");
result.addTask(createRangeRowkeyDistributionStep(seg, cuboidPath));
// create htable step
@@ -240,6 +243,8 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
if (jobConf != null && jobConf.length() > 0) {
builder.append(" -conf ").append(jobConf);
}
+ String setCluster = " -D" + FileSystem.FS_DEFAULT_NAME_KEY + "=" + HadoopUtil.getCurrentConfiguration().get(FileSystem.FS_DEFAULT_NAME_KEY);
+ builder.append(setCluster);
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -263,15 +268,15 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
}
private String getRowkeyDistributionOutputPath(CubeSegment seg) {
- return getJobWorkingDir(seg.getLastBuildJobID()) + "/" + seg.getCubeInstance().getName() + "/rowkey_stats";
+ return HadoopUtil.makeQualifiedPathInHBaseCluster(getJobWorkingDir(seg.getLastBuildJobID()) + "/" + seg.getCubeInstance().getName() + "/rowkey_stats");
}
private String getFactDistinctColumnsPath(CubeSegment seg, String jobUuid) {
- return getJobWorkingDir(jobUuid) + "/" + seg.getCubeInstance().getName() + "/fact_distinct_columns";
+ return HadoopUtil.makeQualifiedPathInHadoopCluster(getJobWorkingDir(jobUuid) + "/" + seg.getCubeInstance().getName() + "/fact_distinct_columns");
}
private String getHFilePath(CubeSegment seg, String jobId) {
- return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/hfile/";
+ return HadoopUtil.makeQualifiedPathInHBaseCluster(getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/hfile/");
}
private MapReduceExecutable createFactDistinctColumnsStep(CubeSegment seg, String intermediateHiveTableName, String jobId) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2748643a/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java b/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java
index 4cb4a80..b4f6e8e 100644
--- a/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java
+++ b/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java
@@ -83,10 +83,14 @@ public class GarbageCollectionStep extends AbstractExecutable {
private void dropHiveTable(ExecutableContext context) throws IOException {
final String hiveTable = this.getOldHiveTable();
if (StringUtils.isNotEmpty(hiveTable)) {
- final String dropSQL = "DROP TABLE IF EXISTS " + hiveTable + ";";
- final String dropHiveCMD = "hive -e \"" + dropSQL + "\"";
+ final String dropSQL = "USE " + KylinConfig.getInstanceFromEnv().getHiveDatabaseForIntermediateTable() + ";" + " DROP TABLE IF EXISTS " + hiveTable + ";";
+ final String setClusterHql = "-hiveconf " + FileSystem.FS_DEFAULT_NAME_KEY + "=\"" + HadoopUtil.getCurrentConfiguration().get(FileSystem.FS_DEFAULT_NAME_KEY) + "\"";
+ final String dropHiveCMD = "hive " + setClusterHql + " -e \"" + dropSQL + "\"";
+ logger.info("executing: " + dropHiveCMD);
ShellCmdOutput shellCmdOutput = new ShellCmdOutput();
context.getConfig().getCliCommandExecutor().execute(dropHiveCMD, shellCmdOutput);
+ logger.debug("Dropped Hive table " + hiveTable + " \n");
+ output.append(shellCmdOutput.getOutput() + " \n");
output.append("Dropped Hive table " + hiveTable + " \n");
}
@@ -129,27 +133,31 @@ public class GarbageCollectionStep extends AbstractExecutable {
}
}
- private void dropHdfsPath(ExecutableContext context) throws IOException {
+ private void dropFileSystemPath(FileSystem fs, Path p) throws IOException {
+ Path path = fs.makeQualified(p);
+ if (fs.exists(path)) {
+ fs.delete(path, true);
+ logger.debug("Dropped HDFS path: " + path);
+ output.append("Dropped HDFS path \"" + path + "\" \n");
+ } else {
+ logger.debug("HDFS path not exists: " + path);
+ output.append("HDFS path not exists: \"" + path + "\" \n");
+ }
+ }
+ private void dropHdfsPath(ExecutableContext context) throws IOException {
List<String> oldHdfsPaths = this.getOldHdsfPaths();
if (oldHdfsPaths != null && oldHdfsPaths.size() > 0) {
- Configuration hconf = HadoopUtil.getCurrentConfiguration();
- FileSystem fileSystem = FileSystem.get(hconf);
+ FileSystem hadoopFs = FileSystem.get(HadoopUtil.getCurrentConfiguration());
+ FileSystem hbaseFs = FileSystem.get(HadoopUtil.getCurrentHBaseConfiguration());
for (String path : oldHdfsPaths) {
if (path.endsWith("*"))
path = path.substring(0, path.length() - 1);
Path oldPath = new Path(path);
- if (fileSystem.exists(oldPath)) {
- fileSystem.delete(oldPath, true);
- logger.debug("Dropped HDFS path: " + path);
- output.append("Dropped HDFS path \"" + path + "\" \n");
- } else {
- logger.debug("HDFS path not exists: " + path);
- output.append("HDFS path not exists: \"" + path + "\" \n");
- }
+ dropFileSystemPath(hadoopFs, oldPath);
+ dropFileSystemPath(hbaseFs, oldPath);
}
-
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2748643a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
index 6ad89d6..a995649 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
@@ -295,7 +295,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
}
protected void deletePath(Configuration conf, Path path) throws IOException {
- FileSystem fs = FileSystem.get(conf);
+ FileSystem fs = FileSystem.get(path.toUri(), conf);
if (fs.exists(path)) {
fs.delete(path, true);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2748643a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/BulkLoadJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/BulkLoadJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/hbase/BulkLoadJob.java
index 2608085..692d53e 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/BulkLoadJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/hbase/BulkLoadJob.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.util.ToolRunner;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.model.CubeDesc;
@@ -59,7 +60,7 @@ public class BulkLoadJob extends AbstractHadoopJob {
// end with "/"
String input = getOptionValue(OPTION_INPUT_PATH);
- Configuration conf = HBaseConfiguration.create(getConf());
+ Configuration conf = HadoopUtil.getCurrentHBaseConfiguration();
FileSystem fs = FileSystem.get(conf);
String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2748643a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java
index f114b5b..027c0ca 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ToolRunner;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.model.CubeDesc;
@@ -79,7 +80,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
tableDesc.setValue(HTableDescriptor.SPLIT_POLICY, ConstantSizeRegionSplitPolicy.class.getName());
tableDesc.setValue(IRealizationConstants.HTableTag, config.getMetadataUrlPrefix());
- Configuration conf = HBaseConfiguration.create(getConf());
+ Configuration conf = HadoopUtil.getCurrentHBaseConfiguration();
HBaseAdmin admin = new HBaseAdmin(conf);
try {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2748643a/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java b/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java
index a28477e..89472b2 100644
--- a/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java
+++ b/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java
@@ -106,7 +106,7 @@ public class DeployCoprocessorCLI {
private static void initHTableCoprocessor(HTableDescriptor desc) throws IOException {
KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
- Configuration hconf = HadoopUtil.getCurrentConfiguration();
+ Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
FileSystem fileSystem = FileSystem.get(hconf);
String localCoprocessorJar = kylinConfig.getCoprocessorLocalJar();