You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/09/11 05:07:58 UTC

[2/2] incubator-kylin git commit: KYLIN-978 GarbageCollectionStep dropped Hive Intermediate Table but didn't drop external hdfs path

KYLIN-978 GarbageCollectionStep dropped Hive Intermediate Table but didn't drop external hdfs path

Signed-off-by: shaofengshi <sh...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/bb1b1a72
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/bb1b1a72
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/bb1b1a72

Branch: refs/heads/KYLIN-968-978
Commit: bb1b1a725b81803d6b6e72735156d2d9cd067a6e
Parents: 9df8849
Author: sunyerui <su...@gmail.com>
Authored: Fri Sep 11 00:04:42 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Fri Sep 11 11:07:26 2015 +0800

----------------------------------------------------------------------
 .../apache/kylin/job/cube/CubingJobBuilder.java | 47 ++++++++++++--------
 .../kylin/job/cube/GarbageCollectionStep.java   | 38 ++++++++++++----
 2 files changed, 58 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bb1b1a72/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
index ff79286..de75f7d 100644
--- a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
@@ -25,8 +25,6 @@ import java.util.List;
 import java.util.TimeZone;
 
 import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeSegment;
@@ -65,9 +63,11 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
         final CubingJob result = initialJob(seg, "BUILD");
         final String jobId = result.getId();
         final String cuboidRootPath = getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/cuboid/";
+        final List<String> toDeletePathsOnHadoopCluster = Lists.newArrayList();
+        final List<String> toDeletePathsOnHBaseCluster = Lists.newArrayList();
 
         // cubing
-        Pair<AbstractExecutable, AbstractExecutable> twoSteps = addCubingSteps(seg, cuboidRootPath, result);
+        Pair<AbstractExecutable, AbstractExecutable> twoSteps = addCubingSteps(seg, cuboidRootPath, result, toDeletePathsOnHadoopCluster);
         String intermediateHiveTableStepId = twoSteps.getFirst().getId();
         String baseCuboidStepId = twoSteps.getSecond().getId();
 
@@ -79,7 +79,8 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
 
         final CubeJoinedFlatTableDesc intermediateTableDesc = new CubeJoinedFlatTableDesc(seg.getCubeDesc(), seg);
         final String hiveIntermediateTable = this.getIntermediateHiveTableName(intermediateTableDesc, jobId);
-        result.addTask(createGarbageCollectionStep(seg, null, hiveIntermediateTable, null));
+        toDeletePathsOnHBaseCluster.add(getJobWorkingDir(jobId));
+        result.addTask(createGarbageCollectionStep(seg, null, hiveIntermediateTable, toDeletePathsOnHadoopCluster, toDeletePathsOnHBaseCluster));
 
         return result;
     }
@@ -92,9 +93,14 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
         final String jobId = result.getId();
         final String appendRootPath = getJobWorkingDir(jobId) + "/" + appendSegment.getCubeInstance().getName() + "/append_cuboid/";
         final String mergedRootPath = getJobWorkingDir(jobId) + "/" + appendSegment.getCubeInstance().getName() + "/cuboid/";
+        List<String> mergingSegmentIds = Lists.newArrayList();
+        List<String> mergingCuboidPaths = Lists.newArrayList();
+        List<String> mergingHTables = Lists.newArrayList();
+        final List<String> toDeletePathsOnHadoopCluster = Lists.newArrayList();
+        final List<String> toDeletePathsOnHBaseCluster = Lists.newArrayList();
 
         // cubing the incremental segment
-        Pair<AbstractExecutable, AbstractExecutable> twoSteps = addCubingSteps(appendSegment, appendRootPath, result);
+        Pair<AbstractExecutable, AbstractExecutable> twoSteps = addCubingSteps(appendSegment, appendRootPath, result, toDeletePathsOnHadoopCluster);
         final String intermediateHiveTableStepId = twoSteps.getFirst().getId();
         final String baseCuboidStepId = twoSteps.getSecond().getId();
 
@@ -103,10 +109,7 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
 
         List<CubeSegment> mergingSegments = mergeSegment.getCubeInstance().getMergingSegments(mergeSegment);
         Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge");
-        List<String> mergingSegmentIds = Lists.newArrayList();
-        List<String> mergingCuboidPaths = Lists.newArrayList();
-        List<String> mergingHTables = Lists.newArrayList();
-        List<String> toDeletePaths = Lists.newArrayList();
+
         for (CubeSegment merging : mergingSegments) {
             mergingSegmentIds.add(merging.getUuid());
             mergingHTables.add(merging.getStorageLocationIdentifier());
@@ -115,7 +118,7 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
             } else {
                 mergingCuboidPaths.add(getPathToMerge(merging));
             }
-            toDeletePaths.add(getJobWorkingDir(merging.getLastBuildJobID()));
+            toDeletePathsOnHadoopCluster.add(getJobWorkingDir(merging.getLastBuildJobID()));
         }
 
         // merge cuboid
@@ -126,7 +129,8 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
 
         // update cube info
         result.addTask(createUpdateCubeInfoAfterMergeStep(mergeSegment, mergingSegmentIds, convertCuboidToHfileStep.getId(), jobId));
-        result.addTask(createGarbageCollectionStep(mergeSegment, mergingHTables, null, toDeletePaths));
+        toDeletePathsOnHBaseCluster.add(getJobWorkingDir(jobId));
+        result.addTask(createGarbageCollectionStep(mergeSegment, mergingHTables, null, toDeletePathsOnHadoopCluster, toDeletePathsOnHBaseCluster));
 
         return result;
     }
@@ -143,12 +147,14 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
         List<String> mergingSegmentIds = Lists.newArrayList();
         List<String> mergingCuboidPaths = Lists.newArrayList();
         List<String> mergingHTables = Lists.newArrayList();
-        List<String> toDeletePaths = Lists.newArrayList();
+        final List<String> toDeletePathsOnHadoopCluster = Lists.newArrayList();
+        final List<String> toDeletePathsOnHBaseCluster = Lists.newArrayList();
+
         for (CubeSegment merging : mergingSegments) {
             mergingSegmentIds.add(merging.getUuid());
             mergingCuboidPaths.add(getPathToMerge(merging));
             mergingHTables.add(merging.getStorageLocationIdentifier());
-            toDeletePaths.add(getJobWorkingDir(merging.getLastBuildJobID()));
+            toDeletePathsOnHadoopCluster.add(getJobWorkingDir(merging.getLastBuildJobID()));
         }
 
         // merge cuboid
@@ -159,7 +165,8 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
 
         // update cube info
         result.addTask(createUpdateCubeInfoAfterMergeStep(seg, mergingSegmentIds, convertCuboidToHfileStep.getId(), jobId));
-        result.addTask(createGarbageCollectionStep(seg, mergingHTables, null, toDeletePaths));
+        toDeletePathsOnHBaseCluster.add(getJobWorkingDir(jobId));
+        result.addTask(createGarbageCollectionStep(seg, mergingHTables, null, toDeletePathsOnHadoopCluster, toDeletePathsOnHBaseCluster));
         return result;
     }
 
@@ -171,7 +178,7 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
         result.addTask(createMergeCuboidDataStep(seg, formattedPath, mergedCuboidPath));
     }
 
-    Pair<AbstractExecutable, AbstractExecutable> addCubingSteps(CubeSegment seg, String cuboidRootPath, CubingJob result) {
+    Pair<AbstractExecutable, AbstractExecutable> addCubingSteps(CubeSegment seg, String cuboidRootPath, CubingJob result, List<String> toDeletePaths) {
         final int groupRowkeyColumnsCount = seg.getCubeDesc().getRowkey().getNCuboidBuildLevels();
         final int totalRowkeyColumnsCount = seg.getCubeDesc().getRowkey().getRowKeyColumns().length;
 
@@ -199,6 +206,9 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
             result.addTask(createNDimensionCuboidStep(seg, cuboidOutputTempPath, dimNum, totalRowkeyColumnsCount));
         }
 
+        toDeletePaths.add(intermediateHiveTableLocation);
+        toDeletePaths.add(factDistinctColumnsPath);
+
         return new Pair<AbstractExecutable, AbstractExecutable>(intermediateHiveTableStep, baseCuboidStep);
     }
 
@@ -266,7 +276,7 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
     }
 
     private String getRowkeyDistributionOutputPath(CubeSegment seg, String jobId) {
-        return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/rowkey_stats";
+        return HadoopUtil.makeQualifiedPathInHBaseCluster(getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/rowkey_stats");
     }
 
     private String getFactDistinctColumnsPath(CubeSegment seg, String jobUuid) {
@@ -460,12 +470,13 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
         return result;
     }
 
-    private GarbageCollectionStep createGarbageCollectionStep(CubeSegment seg, List<String> oldHtables, String hiveIntermediateTable, List<String> oldHdsfPaths) {
+    private GarbageCollectionStep createGarbageCollectionStep(CubeSegment seg, List<String> oldHtables, String hiveIntermediateTable, List<String> oldHdsfPaths, List<String> oldHdfsPathsOnHBaseCluster) {
         GarbageCollectionStep result = new GarbageCollectionStep();
         result.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION);
         result.setOldHTables(oldHtables);
         result.setOldHiveTable(hiveIntermediateTable);
-        result.setOldHdsfPaths(oldHdsfPaths);
+        result.setOldHdfsPaths(oldHdsfPaths);
+        result.setOldHdfsPathsOnHBaseCluster(oldHdfsPathsOnHBaseCluster);
         return result;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bb1b1a72/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java b/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java
index 72cad96..641454c 100644
--- a/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java
+++ b/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java
@@ -55,6 +55,8 @@ public class GarbageCollectionStep extends AbstractExecutable {
 
     private static final String OLD_HDFS_PATHS = "oldHdfsPaths";
 
+    private static final String OLD_HDFS_PATHS_ON_HBASE_CLUSTER = "oldHdfsPathsOnHBaseCluster";
+
     private static final Logger logger = LoggerFactory.getLogger(GarbageCollectionStep.class);
 
     private StringBuffer output;
@@ -69,8 +71,9 @@ public class GarbageCollectionStep extends AbstractExecutable {
 
         try {
             dropHBaseTable(context);
-            dropHdfsPath(context);
             dropHiveTable(context);
+            dropHdfsPath(context);
+            dropHdfsPathOnHBaseCluster(context);
         } catch (IOException e) {
             logger.error("job:" + getId() + " execute finished with exception", e);
             output.append("\n").append(e.getLocalizedMessage());
@@ -131,13 +134,11 @@ public class GarbageCollectionStep extends AbstractExecutable {
             }
         }
     }
-    
-    private void dropHdfsPath(ExecutableContext context) throws IOException {
 
-        List<String> oldHdfsPaths = this.getOldHdsfPaths();
+    private void dropHdfsPathOnCluster(List<String> oldHdfsPaths, FileSystem fileSystem) throws IOException {
         if (oldHdfsPaths != null && oldHdfsPaths.size() > 0) {
-            Configuration hconf = HadoopUtil.getCurrentConfiguration();
-            FileSystem fileSystem = FileSystem.get(hconf);
+            logger.debug("Drop HDFS path on FileSystem: " + fileSystem.getUri());
+            output.append("Drop HDFS path on FileSystem: \"" + fileSystem.getUri() + "\" \n");
             for (String path : oldHdfsPaths) {
                 if (path.endsWith("*"))
                     path = path.substring(0, path.length() - 1);
@@ -152,10 +153,21 @@ public class GarbageCollectionStep extends AbstractExecutable {
                     output.append("HDFS path not exists: \"" + path + "\" \n");
                 }
             }
-
         }
     }
 
+    private void dropHdfsPath(ExecutableContext context) throws IOException {
+        List<String> oldHdfsPaths = this.getOldHdfsPaths();
+        FileSystem fileSystem = FileSystem.get(HadoopUtil.getCurrentConfiguration());
+        dropHdfsPathOnCluster(oldHdfsPaths, fileSystem);
+    }
+
+    private void dropHdfsPathOnHBaseCluster(ExecutableContext context) throws IOException {
+        List<String> oldHdfsPaths = this.getOldHdfsPathsOnHBaseCluster();
+        FileSystem fileSystem = FileSystem.get(HadoopUtil.getCurrentHBaseConfiguration());
+        dropHdfsPathOnCluster(oldHdfsPaths, fileSystem);
+    }
+
     public void setOldHTables(List<String> tables) {
         setArrayParam(OLD_HTABLES, tables);
     }
@@ -164,14 +176,22 @@ public class GarbageCollectionStep extends AbstractExecutable {
         return getArrayParam(OLD_HTABLES);
     }
 
-    public void setOldHdsfPaths(List<String> paths) {
+    public void setOldHdfsPaths(List<String> paths) {
         setArrayParam(OLD_HDFS_PATHS, paths);
     }
 
-    private List<String> getOldHdsfPaths() {
+    private List<String> getOldHdfsPaths() {
         return getArrayParam(OLD_HDFS_PATHS);
     }
 
+    public void setOldHdfsPathsOnHBaseCluster(List<String> paths) {
+        setArrayParam(OLD_HDFS_PATHS_ON_HBASE_CLUSTER, paths);
+    }
+
+    private List<String> getOldHdfsPathsOnHBaseCluster() {
+        return getArrayParam(OLD_HDFS_PATHS_ON_HBASE_CLUSTER);
+    }
+
     private void setArrayParam(String paramKey, List<String> paramValues) {
         setParam(paramKey, StringUtils.join(paramValues, ","));
     }