You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/09/19 01:47:23 UTC
[47/50] [abbrv] incubator-kylin git commit: KYLIN-978
GarbageCollectionStep dropped Hive Intermediate Table but didn't drop
external hdfs path
KYLIN-978 GarbageCollectionStep dropped Hive Intermediate Table but didn't drop external hdfs path
Signed-off-by: shaofengshi <sh...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/fd172821
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/fd172821
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/fd172821
Branch: refs/heads/master
Commit: fd172821d9070b1bd704291a164569dc9d920045
Parents: 8fd1404
Author: sunyerui <su...@gmail.com>
Authored: Fri Sep 11 00:04:42 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Wed Sep 16 21:02:26 2015 +0800
----------------------------------------------------------------------
.../apache/kylin/job/cube/CubingJobBuilder.java | 47 ++++++++++++--------
.../kylin/job/cube/GarbageCollectionStep.java | 38 ++++++++++++----
2 files changed, 58 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fd172821/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
index ff79286..de75f7d 100644
--- a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
@@ -25,8 +25,6 @@ import java.util.List;
import java.util.TimeZone;
import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeSegment;
@@ -65,9 +63,11 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
final CubingJob result = initialJob(seg, "BUILD");
final String jobId = result.getId();
final String cuboidRootPath = getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/cuboid/";
+ final List<String> toDeletePathsOnHadoopCluster = Lists.newArrayList();
+ final List<String> toDeletePathsOnHBaseCluster = Lists.newArrayList();
// cubing
- Pair<AbstractExecutable, AbstractExecutable> twoSteps = addCubingSteps(seg, cuboidRootPath, result);
+ Pair<AbstractExecutable, AbstractExecutable> twoSteps = addCubingSteps(seg, cuboidRootPath, result, toDeletePathsOnHadoopCluster);
String intermediateHiveTableStepId = twoSteps.getFirst().getId();
String baseCuboidStepId = twoSteps.getSecond().getId();
@@ -79,7 +79,8 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
final CubeJoinedFlatTableDesc intermediateTableDesc = new CubeJoinedFlatTableDesc(seg.getCubeDesc(), seg);
final String hiveIntermediateTable = this.getIntermediateHiveTableName(intermediateTableDesc, jobId);
- result.addTask(createGarbageCollectionStep(seg, null, hiveIntermediateTable, null));
+ toDeletePathsOnHBaseCluster.add(getJobWorkingDir(jobId));
+ result.addTask(createGarbageCollectionStep(seg, null, hiveIntermediateTable, toDeletePathsOnHadoopCluster, toDeletePathsOnHBaseCluster));
return result;
}
@@ -92,9 +93,14 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
final String jobId = result.getId();
final String appendRootPath = getJobWorkingDir(jobId) + "/" + appendSegment.getCubeInstance().getName() + "/append_cuboid/";
final String mergedRootPath = getJobWorkingDir(jobId) + "/" + appendSegment.getCubeInstance().getName() + "/cuboid/";
+ List<String> mergingSegmentIds = Lists.newArrayList();
+ List<String> mergingCuboidPaths = Lists.newArrayList();
+ List<String> mergingHTables = Lists.newArrayList();
+ final List<String> toDeletePathsOnHadoopCluster = Lists.newArrayList();
+ final List<String> toDeletePathsOnHBaseCluster = Lists.newArrayList();
// cubing the incremental segment
- Pair<AbstractExecutable, AbstractExecutable> twoSteps = addCubingSteps(appendSegment, appendRootPath, result);
+ Pair<AbstractExecutable, AbstractExecutable> twoSteps = addCubingSteps(appendSegment, appendRootPath, result, toDeletePathsOnHadoopCluster);
final String intermediateHiveTableStepId = twoSteps.getFirst().getId();
final String baseCuboidStepId = twoSteps.getSecond().getId();
@@ -103,10 +109,7 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
List<CubeSegment> mergingSegments = mergeSegment.getCubeInstance().getMergingSegments(mergeSegment);
Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge");
- List<String> mergingSegmentIds = Lists.newArrayList();
- List<String> mergingCuboidPaths = Lists.newArrayList();
- List<String> mergingHTables = Lists.newArrayList();
- List<String> toDeletePaths = Lists.newArrayList();
+
for (CubeSegment merging : mergingSegments) {
mergingSegmentIds.add(merging.getUuid());
mergingHTables.add(merging.getStorageLocationIdentifier());
@@ -115,7 +118,7 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
} else {
mergingCuboidPaths.add(getPathToMerge(merging));
}
- toDeletePaths.add(getJobWorkingDir(merging.getLastBuildJobID()));
+ toDeletePathsOnHadoopCluster.add(getJobWorkingDir(merging.getLastBuildJobID()));
}
// merge cuboid
@@ -126,7 +129,8 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
// update cube info
result.addTask(createUpdateCubeInfoAfterMergeStep(mergeSegment, mergingSegmentIds, convertCuboidToHfileStep.getId(), jobId));
- result.addTask(createGarbageCollectionStep(mergeSegment, mergingHTables, null, toDeletePaths));
+ toDeletePathsOnHBaseCluster.add(getJobWorkingDir(jobId));
+ result.addTask(createGarbageCollectionStep(mergeSegment, mergingHTables, null, toDeletePathsOnHadoopCluster, toDeletePathsOnHBaseCluster));
return result;
}
@@ -143,12 +147,14 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
List<String> mergingSegmentIds = Lists.newArrayList();
List<String> mergingCuboidPaths = Lists.newArrayList();
List<String> mergingHTables = Lists.newArrayList();
- List<String> toDeletePaths = Lists.newArrayList();
+ final List<String> toDeletePathsOnHadoopCluster = Lists.newArrayList();
+ final List<String> toDeletePathsOnHBaseCluster = Lists.newArrayList();
+
for (CubeSegment merging : mergingSegments) {
mergingSegmentIds.add(merging.getUuid());
mergingCuboidPaths.add(getPathToMerge(merging));
mergingHTables.add(merging.getStorageLocationIdentifier());
- toDeletePaths.add(getJobWorkingDir(merging.getLastBuildJobID()));
+ toDeletePathsOnHadoopCluster.add(getJobWorkingDir(merging.getLastBuildJobID()));
}
// merge cuboid
@@ -159,7 +165,8 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
// update cube info
result.addTask(createUpdateCubeInfoAfterMergeStep(seg, mergingSegmentIds, convertCuboidToHfileStep.getId(), jobId));
- result.addTask(createGarbageCollectionStep(seg, mergingHTables, null, toDeletePaths));
+ toDeletePathsOnHBaseCluster.add(getJobWorkingDir(jobId));
+ result.addTask(createGarbageCollectionStep(seg, mergingHTables, null, toDeletePathsOnHadoopCluster, toDeletePathsOnHBaseCluster));
return result;
}
@@ -171,7 +178,7 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
result.addTask(createMergeCuboidDataStep(seg, formattedPath, mergedCuboidPath));
}
- Pair<AbstractExecutable, AbstractExecutable> addCubingSteps(CubeSegment seg, String cuboidRootPath, CubingJob result) {
+ Pair<AbstractExecutable, AbstractExecutable> addCubingSteps(CubeSegment seg, String cuboidRootPath, CubingJob result, List<String> toDeletePaths) {
final int groupRowkeyColumnsCount = seg.getCubeDesc().getRowkey().getNCuboidBuildLevels();
final int totalRowkeyColumnsCount = seg.getCubeDesc().getRowkey().getRowKeyColumns().length;
@@ -199,6 +206,9 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
result.addTask(createNDimensionCuboidStep(seg, cuboidOutputTempPath, dimNum, totalRowkeyColumnsCount));
}
+ toDeletePaths.add(intermediateHiveTableLocation);
+ toDeletePaths.add(factDistinctColumnsPath);
+
return new Pair<AbstractExecutable, AbstractExecutable>(intermediateHiveTableStep, baseCuboidStep);
}
@@ -266,7 +276,7 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
}
private String getRowkeyDistributionOutputPath(CubeSegment seg, String jobId) {
- return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/rowkey_stats";
+ return HadoopUtil.makeQualifiedPathInHBaseCluster(getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/rowkey_stats");
}
private String getFactDistinctColumnsPath(CubeSegment seg, String jobUuid) {
@@ -460,12 +470,13 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
return result;
}
- private GarbageCollectionStep createGarbageCollectionStep(CubeSegment seg, List<String> oldHtables, String hiveIntermediateTable, List<String> oldHdsfPaths) {
+ private GarbageCollectionStep createGarbageCollectionStep(CubeSegment seg, List<String> oldHtables, String hiveIntermediateTable, List<String> oldHdsfPaths, List<String> oldHdfsPathsOnHBaseCluster) {
GarbageCollectionStep result = new GarbageCollectionStep();
result.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION);
result.setOldHTables(oldHtables);
result.setOldHiveTable(hiveIntermediateTable);
- result.setOldHdsfPaths(oldHdsfPaths);
+ result.setOldHdfsPaths(oldHdsfPaths);
+ result.setOldHdfsPathsOnHBaseCluster(oldHdfsPathsOnHBaseCluster);
return result;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fd172821/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java b/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java
index 72cad96..641454c 100644
--- a/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java
+++ b/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java
@@ -55,6 +55,8 @@ public class GarbageCollectionStep extends AbstractExecutable {
private static final String OLD_HDFS_PATHS = "oldHdfsPaths";
+ private static final String OLD_HDFS_PATHS_ON_HBASE_CLUSTER = "oldHdfsPathsOnHBaseCluster";
+
private static final Logger logger = LoggerFactory.getLogger(GarbageCollectionStep.class);
private StringBuffer output;
@@ -69,8 +71,9 @@ public class GarbageCollectionStep extends AbstractExecutable {
try {
dropHBaseTable(context);
- dropHdfsPath(context);
dropHiveTable(context);
+ dropHdfsPath(context);
+ dropHdfsPathOnHBaseCluster(context);
} catch (IOException e) {
logger.error("job:" + getId() + " execute finished with exception", e);
output.append("\n").append(e.getLocalizedMessage());
@@ -131,13 +134,11 @@ public class GarbageCollectionStep extends AbstractExecutable {
}
}
}
-
- private void dropHdfsPath(ExecutableContext context) throws IOException {
- List<String> oldHdfsPaths = this.getOldHdsfPaths();
+ private void dropHdfsPathOnCluster(List<String> oldHdfsPaths, FileSystem fileSystem) throws IOException {
if (oldHdfsPaths != null && oldHdfsPaths.size() > 0) {
- Configuration hconf = HadoopUtil.getCurrentConfiguration();
- FileSystem fileSystem = FileSystem.get(hconf);
+ logger.debug("Drop HDFS path on FileSystem: " + fileSystem.getUri());
+ output.append("Drop HDFS path on FileSystem: \"" + fileSystem.getUri() + "\" \n");
for (String path : oldHdfsPaths) {
if (path.endsWith("*"))
path = path.substring(0, path.length() - 1);
@@ -152,10 +153,21 @@ public class GarbageCollectionStep extends AbstractExecutable {
output.append("HDFS path not exists: \"" + path + "\" \n");
}
}
-
}
}
+ private void dropHdfsPath(ExecutableContext context) throws IOException {
+ List<String> oldHdfsPaths = this.getOldHdfsPaths();
+ FileSystem fileSystem = FileSystem.get(HadoopUtil.getCurrentConfiguration());
+ dropHdfsPathOnCluster(oldHdfsPaths, fileSystem);
+ }
+
+ private void dropHdfsPathOnHBaseCluster(ExecutableContext context) throws IOException {
+ List<String> oldHdfsPaths = this.getOldHdfsPathsOnHBaseCluster();
+ FileSystem fileSystem = FileSystem.get(HadoopUtil.getCurrentHBaseConfiguration());
+ dropHdfsPathOnCluster(oldHdfsPaths, fileSystem);
+ }
+
public void setOldHTables(List<String> tables) {
setArrayParam(OLD_HTABLES, tables);
}
@@ -164,14 +176,22 @@ public class GarbageCollectionStep extends AbstractExecutable {
return getArrayParam(OLD_HTABLES);
}
- public void setOldHdsfPaths(List<String> paths) {
+ public void setOldHdfsPaths(List<String> paths) {
setArrayParam(OLD_HDFS_PATHS, paths);
}
- private List<String> getOldHdsfPaths() {
+ private List<String> getOldHdfsPaths() {
return getArrayParam(OLD_HDFS_PATHS);
}
+ public void setOldHdfsPathsOnHBaseCluster(List<String> paths) {
+ setArrayParam(OLD_HDFS_PATHS_ON_HBASE_CLUSTER, paths);
+ }
+
+ private List<String> getOldHdfsPathsOnHBaseCluster() {
+ return getArrayParam(OLD_HDFS_PATHS_ON_HBASE_CLUSTER);
+ }
+
private void setArrayParam(String paramKey, List<String> paramValues) {
setParam(paramKey, StringUtils.join(paramValues, ","));
}