You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/09/22 05:40:35 UTC
[1/2] incubator-kylin git commit: KYLIN-978 GarbageCollectionStep
dropped Hive Intermediate Table but didn't drop external hdfs path
Repository: incubator-kylin
Updated Branches:
refs/heads/2.x-staging 8581df4a2 -> 985e1fb20
KYLIN-978 GarbageCollectionStep dropped Hive Intermediate Table but didn't drop external hdfs path
Signed-off-by: shaofengshi <sh...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/738422e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/738422e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/738422e1
Branch: refs/heads/2.x-staging
Commit: 738422e1030097d887956e769a4bf754b06b750c
Parents: 8581df4
Author: sunyerui <su...@gmail.com>
Authored: Fri Sep 18 19:11:31 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Tue Sep 22 11:19:53 2015 +0800
----------------------------------------------------------------------
.../org/apache/kylin/job/JoinedFlatTable.java | 2 +-
.../apache/kylin/source/hive/HiveMRInput.java | 19 ++-
.../storage/hbase/steps/HBaseMROutput.java | 5 +-
.../kylin/storage/hbase/steps/HBaseMRSteps.java | 50 +++++++
.../steps/HDFSPathGarbageCollectionStep.java | 138 +++++++++++++++++++
5 files changed, 209 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/738422e1/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
index 6ae3ccb..5886325 100644
--- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
@@ -72,7 +72,7 @@ public class JoinedFlatTable {
ddl.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\177'" + "\n");
ddl.append("STORED AS SEQUENCEFILE" + "\n");
- ddl.append("LOCATION '" + storageDfsDir + "/" + intermediateTableDesc.getTableName() + "';").append("\n");
+ ddl.append("LOCATION '" + getTableDir(intermediateTableDesc, storageDfsDir) + "';").append("\n");
// ddl.append("TBLPROPERTIES ('serialization.null.format'='\\\\N')" +
// ";\n");
return ddl.toString();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/738422e1/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index 1dcdc94..4571852 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -21,6 +21,8 @@ package org.apache.kylin.source.hive;
import java.io.IOException;
import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hive.hcatalog.data.HCatRecord;
import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
@@ -134,6 +136,7 @@ public class HiveMRInput implements IMRInput {
GarbageCollectionStep step = new GarbageCollectionStep();
step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION);
step.setIntermediateTableIdentity(getIntermediateTableIdentity());
+ step.setExternalDataPath(JoinedFlatTable.getTableDir(flatHiveTableDesc, JobBuilderSupport.getJobWorkingDir(conf, jobFlow.getId())));
jobFlow.addTask(step);
}
@@ -148,7 +151,6 @@ public class HiveMRInput implements IMRInput {
}
public static class GarbageCollectionStep extends AbstractExecutable {
-
@Override
protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
KylinConfig config = context.getConfig();
@@ -161,6 +163,13 @@ public class HiveMRInput implements IMRInput {
try {
config.getCliCommandExecutor().execute(dropHiveCMD);
output.append("Hive table " + hiveTable + " is dropped. \n");
+
+ Path externalDataPath = new Path(getExternalDataPath());
+ FileSystem fs = FileSystem.get(externalDataPath.toUri(), HadoopUtil.getCurrentConfiguration());
+ if (fs.exists(externalDataPath)) {
+ fs.delete(externalDataPath, true);
+ output.append("Hive table " + hiveTable + " external data path " + externalDataPath + " is deleted. \n");
+ }
} catch (IOException e) {
logger.error("job:" + getId() + " execute finished with exception", e);
return new ExecuteResult(ExecuteResult.State.ERROR, e.getMessage());
@@ -177,6 +186,14 @@ public class HiveMRInput implements IMRInput {
private String getIntermediateTableIdentity() {
return getParam("oldHiveTable");
}
+
+ public void setExternalDataPath(String externalDataPath) {
+ setParam("externalDataPath", externalDataPath);
+ }
+
+ private String getExternalDataPath() {
+ return getParam("externalDataPath");
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/738422e1/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput.java
index 8cbb7ff..c634a1d 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput.java
@@ -21,7 +21,6 @@ package org.apache.kylin.storage.hbase.steps;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.IMROutput;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.storage.hbase.steps.HBaseMRSteps;
public class HBaseMROutput implements IMROutput {
@@ -37,7 +36,7 @@ public class HBaseMROutput implements IMROutput {
@Override
public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
- // nothing to do
+ steps.addCubingGarbageCollectionSteps(jobFlow);
}
};
}
@@ -54,7 +53,7 @@ public class HBaseMROutput implements IMROutput {
@Override
public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow) {
- jobFlow.addTask(steps.createMergeGCStep());
+ steps.addMergingGarbageCollectionSteps(jobFlow);
}
};
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/738422e1/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
index dfb4f33..f9e9b15 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
@@ -1,5 +1,6 @@
package org.apache.kylin.storage.hbase.steps;
+import java.util.ArrayList;
import java.util.List;
import org.apache.kylin.cube.CubeSegment;
@@ -129,6 +130,16 @@ public class HBaseMRSteps extends JobBuilderSupport {
return mergingHTables;
}
+ public List<String> getMergingHDFSPaths() {
+ final List<CubeSegment> mergingSegments = seg.getCubeInstance().getMergingSegments(seg);
+ Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge");
+ final List<String> mergingHDFSPaths = Lists.newArrayList();
+ for (CubeSegment merging : mergingSegments) {
+ mergingHDFSPaths.add(getJobWorkingDir(merging.getLastBuildJobID()));
+ }
+ return mergingHDFSPaths;
+ }
+
public String getHFilePath(String jobId) {
return HadoopUtil.makeQualifiedPathInHBaseCluster(getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/hfile/");
}
@@ -137,4 +148,43 @@ public class HBaseMRSteps extends JobBuilderSupport {
return HadoopUtil.makeQualifiedPathInHBaseCluster(getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/rowkey_stats");
}
+ public void addMergingGarbageCollectionSteps(DefaultChainedExecutable jobFlow) {
+ String jobId = jobFlow.getId();
+
+ jobFlow.addTask(createMergeGCStep());
+
+ List<String> toDeletePathsOnHadoopCluster = new ArrayList<>();
+ toDeletePathsOnHadoopCluster.addAll(getMergingHDFSPaths());
+
+ List<String> toDeletePathsOnHbaseCluster = new ArrayList<>();
+ toDeletePathsOnHbaseCluster.add(getRowkeyDistributionOutputPath(jobId));
+ toDeletePathsOnHbaseCluster.add(getHFilePath(jobId));
+
+ HDFSPathGarbageCollectionStep step = new HDFSPathGarbageCollectionStep();
+ step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION);
+ step.setDeletePathsOnHadoopCluster(toDeletePathsOnHadoopCluster);
+ step.setDeletePathsOnHBaseCluster(toDeletePathsOnHbaseCluster);
+ step.setJobId(jobId);
+
+ jobFlow.addTask(step);
+ }
+
+ public void addCubingGarbageCollectionSteps(DefaultChainedExecutable jobFlow) {
+ String jobId = jobFlow.getId();
+
+ List<String> toDeletePathsOnHadoopCluster = new ArrayList<>();
+ toDeletePathsOnHadoopCluster.add(getFactDistinctColumnsPath(jobId));
+
+ List<String> toDeletePathsOnHbaseCluster = new ArrayList<>();
+ toDeletePathsOnHbaseCluster.add(getRowkeyDistributionOutputPath(jobId));
+ toDeletePathsOnHbaseCluster.add(getHFilePath(jobId));
+
+ HDFSPathGarbageCollectionStep step = new HDFSPathGarbageCollectionStep();
+ step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION);
+ step.setDeletePathsOnHadoopCluster(toDeletePathsOnHadoopCluster);
+ step.setDeletePathsOnHBaseCluster(toDeletePathsOnHbaseCluster);
+ step.setJobId(jobId);
+
+ jobFlow.addTask(step);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/738422e1/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java
new file mode 100644
index 0000000..2ae8ca8
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.storage.hbase.steps;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Created by sunyerui on 15/9/17.
+ */
+public class HDFSPathGarbageCollectionStep extends AbstractExecutable {
+
+ private StringBuffer output;
+ private JobEngineConfig config;
+
+ public HDFSPathGarbageCollectionStep() {
+ super();
+ output = new StringBuffer();
+ }
+
+ @Override
+ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ try {
+ config = new JobEngineConfig(context.getConfig());
+ dropHdfsPathOnCluster(getDeletePathsOnHadoopCluster(), FileSystem.get(HadoopUtil.getCurrentConfiguration()));
+ dropHdfsPathOnCluster(getDeletePathsOnHBaseCluster(), FileSystem.get(HadoopUtil.getCurrentHBaseConfiguration()));
+ } catch (IOException e) {
+ logger.error("job:" + getId() + " execute finished with exception", e);
+ output.append("\n").append(e.getLocalizedMessage());
+ return new ExecuteResult(ExecuteResult.State.ERROR, output.toString());
+ }
+
+ return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());
+ }
+
+ private void dropHdfsPathOnCluster(List<String> oldHdfsPaths, FileSystem fileSystem) throws IOException {
+ if (oldHdfsPaths != null && oldHdfsPaths.size() > 0) {
+ logger.debug("Drop HDFS path on FileSystem: " + fileSystem.getUri());
+ output.append("Drop HDFS path on FileSystem: \"" + fileSystem.getUri() + "\" \n");
+ for (String path : oldHdfsPaths) {
+ if (path.endsWith("*"))
+ path = path.substring(0, path.length() - 1);
+
+ Path oldPath = new Path(path);
+ if (fileSystem.exists(oldPath)) {
+ fileSystem.delete(oldPath, true);
+ logger.debug("HDFS path " + path + " is dropped.");
+ output.append("HDFS path " + path + " is dropped.\n");
+ } else {
+ logger.debug("HDFS path " + path + " not exists.");
+ output.append("HDFS path " + path + " not exists.\n");
+ }
+ // If hbase was deployed on another cluster, the job dir is empty and should be dropped,
+ // because of rowkey_stats and hfile dirs are both dropped.
+ if (fileSystem.listStatus(oldPath.getParent()).length == 0) {
+ Path emptyJobPath = new Path(JobBuilderSupport.getJobWorkingDir(config, getJobId()));
+ if (fileSystem.exists(emptyJobPath)) {
+ fileSystem.delete(emptyJobPath, true);
+ logger.debug("HDFS path " + emptyJobPath + " is empty and dropped.");
+ output.append("HDFS path " + emptyJobPath + " is empty and dropped.\n");
+ }
+ }
+ }
+ }
+ }
+
+ public void setDeletePathsOnHadoopCluster(List<String> deletePaths) {
+ setArrayParam("toDeletePathsOnHadoopCluster", deletePaths);
+ }
+
+ public void setDeletePathsOnHBaseCluster(List<String> deletePaths) {
+ setArrayParam("toDeletePathsOnHBaseCluster", deletePaths);
+ }
+
+ public void setJobId(String jobId) {
+ setParam("jobId", jobId);
+ }
+
+ public List<String> getDeletePathsOnHadoopCluster() {
+ return getArrayParam("toDeletePathsOnHadoopCluster");
+ }
+
+ public List<String> getDeletePathsOnHBaseCluster() {
+ return getArrayParam("toDeletePathsOnHBaseCluster");
+ }
+
+ public String getJobId() {
+ return getParam("jobId");
+ }
+
+ private void setArrayParam(String paramKey, List<String> paramValues) {
+ setParam(paramKey, StringUtils.join(paramValues, ","));
+ }
+
+ private List<String> getArrayParam(String paramKey) {
+ final String ids = getParam(paramKey);
+ if (ids != null) {
+ final String[] splitted = StringUtils.split(ids, ",");
+ ArrayList<String> result = Lists.newArrayListWithExpectedSize(splitted.length);
+ for (String id : splitted) {
+ result.add(id);
+ }
+ return result;
+ } else {
+ return Collections.emptyList();
+ }
+ }
+}
[2/2] incubator-kylin git commit: KYLIN-978 small update based on yerui’s patch
Posted by sh...@apache.org.
KYLIN-978 small update based on yerui’s patch
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/985e1fb2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/985e1fb2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/985e1fb2
Branch: refs/heads/2.x-staging
Commit: 985e1fb2047d296ac89b6cf1230915bbb13dfe99
Parents: 738422e
Author: shaofengshi <sh...@apache.org>
Authored: Tue Sep 22 11:40:26 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Tue Sep 22 11:40:26 2015 +0800
----------------------------------------------------------------------
.../kylin/storage/hbase/steps/HBaseMRSteps.java | 22 +++++------------
.../steps/HDFSPathGarbageCollectionStep.java | 26 +++++++++-----------
2 files changed, 17 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/985e1fb2/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
index f9e9b15..4901512 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
@@ -153,17 +153,12 @@ public class HBaseMRSteps extends JobBuilderSupport {
jobFlow.addTask(createMergeGCStep());
- List<String> toDeletePathsOnHadoopCluster = new ArrayList<>();
- toDeletePathsOnHadoopCluster.addAll(getMergingHDFSPaths());
-
- List<String> toDeletePathsOnHbaseCluster = new ArrayList<>();
- toDeletePathsOnHbaseCluster.add(getRowkeyDistributionOutputPath(jobId));
- toDeletePathsOnHbaseCluster.add(getHFilePath(jobId));
+ List<String> toDeletePaths = new ArrayList<>();
+ toDeletePaths.addAll(getMergingHDFSPaths());
HDFSPathGarbageCollectionStep step = new HDFSPathGarbageCollectionStep();
step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION);
- step.setDeletePathsOnHadoopCluster(toDeletePathsOnHadoopCluster);
- step.setDeletePathsOnHBaseCluster(toDeletePathsOnHbaseCluster);
+ step.setDeletePaths(toDeletePaths);
step.setJobId(jobId);
jobFlow.addTask(step);
@@ -172,17 +167,12 @@ public class HBaseMRSteps extends JobBuilderSupport {
public void addCubingGarbageCollectionSteps(DefaultChainedExecutable jobFlow) {
String jobId = jobFlow.getId();
- List<String> toDeletePathsOnHadoopCluster = new ArrayList<>();
- toDeletePathsOnHadoopCluster.add(getFactDistinctColumnsPath(jobId));
-
- List<String> toDeletePathsOnHbaseCluster = new ArrayList<>();
- toDeletePathsOnHbaseCluster.add(getRowkeyDistributionOutputPath(jobId));
- toDeletePathsOnHbaseCluster.add(getHFilePath(jobId));
+ List<String> toDeletePaths = new ArrayList<>();
+ toDeletePaths.add(getFactDistinctColumnsPath(jobId));
HDFSPathGarbageCollectionStep step = new HDFSPathGarbageCollectionStep();
step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION);
- step.setDeletePathsOnHadoopCluster(toDeletePathsOnHadoopCluster);
- step.setDeletePathsOnHBaseCluster(toDeletePathsOnHbaseCluster);
+ step.setDeletePaths(toDeletePaths);
step.setJobId(jobId);
jobFlow.addTask(step);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/985e1fb2/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java
index 2ae8ca8..f9f0b80 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java
@@ -19,7 +19,6 @@ package org.apache.kylin.storage.hbase.steps;
import com.google.common.collect.Lists;
import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kylin.engine.mr.HadoopUtil;
@@ -40,6 +39,7 @@ import java.util.List;
*/
public class HDFSPathGarbageCollectionStep extends AbstractExecutable {
+ public static final String TO_DELETE_PATHS = "toDeletePaths";
private StringBuffer output;
private JobEngineConfig config;
@@ -52,8 +52,12 @@ public class HDFSPathGarbageCollectionStep extends AbstractExecutable {
protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
try {
config = new JobEngineConfig(context.getConfig());
- dropHdfsPathOnCluster(getDeletePathsOnHadoopCluster(), FileSystem.get(HadoopUtil.getCurrentConfiguration()));
- dropHdfsPathOnCluster(getDeletePathsOnHBaseCluster(), FileSystem.get(HadoopUtil.getCurrentHBaseConfiguration()));
+ List<String> toDeletePaths = getDeletePaths();
+ dropHdfsPathOnCluster(toDeletePaths, FileSystem.get(HadoopUtil.getCurrentConfiguration()));
+
+ if (StringUtils.isNotEmpty(context.getConfig().getHBaseClusterFs())) {
+ dropHdfsPathOnCluster(toDeletePaths, FileSystem.get(HadoopUtil.getCurrentHBaseConfiguration()));
+ }
} catch (IOException e) {
logger.error("job:" + getId() + " execute finished with exception", e);
output.append("\n").append(e.getLocalizedMessage());
@@ -94,24 +98,16 @@ public class HDFSPathGarbageCollectionStep extends AbstractExecutable {
}
}
- public void setDeletePathsOnHadoopCluster(List<String> deletePaths) {
- setArrayParam("toDeletePathsOnHadoopCluster", deletePaths);
- }
-
- public void setDeletePathsOnHBaseCluster(List<String> deletePaths) {
- setArrayParam("toDeletePathsOnHBaseCluster", deletePaths);
+ public void setDeletePaths(List<String> deletePaths) {
+ setArrayParam(TO_DELETE_PATHS, deletePaths);
}
public void setJobId(String jobId) {
setParam("jobId", jobId);
}
- public List<String> getDeletePathsOnHadoopCluster() {
- return getArrayParam("toDeletePathsOnHadoopCluster");
- }
-
- public List<String> getDeletePathsOnHBaseCluster() {
- return getArrayParam("toDeletePathsOnHBaseCluster");
+ public List<String> getDeletePaths() {
+ return getArrayParam(TO_DELETE_PATHS);
}
public String getJobId() {