You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by qh...@apache.org on 2015/09/23 05:29:18 UTC

[04/11] incubator-kylin git commit: KYLIN-978 GarbageCollectionStep dropped Hive Intermediate Table but didn't drop external hdfs path

KYLIN-978 GarbageCollectionStep dropped Hive Intermediate Table but didn't drop external hdfs path

Signed-off-by: shaofengshi <sh...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/738422e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/738422e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/738422e1

Branch: refs/heads/KYLIN-1011
Commit: 738422e1030097d887956e769a4bf754b06b750c
Parents: 8581df4
Author: sunyerui <su...@gmail.com>
Authored: Fri Sep 18 19:11:31 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Tue Sep 22 11:19:53 2015 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/job/JoinedFlatTable.java   |   2 +-
 .../apache/kylin/source/hive/HiveMRInput.java   |  19 ++-
 .../storage/hbase/steps/HBaseMROutput.java      |   5 +-
 .../kylin/storage/hbase/steps/HBaseMRSteps.java |  50 +++++++
 .../steps/HDFSPathGarbageCollectionStep.java    | 138 +++++++++++++++++++
 5 files changed, 209 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/738422e1/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
index 6ae3ccb..5886325 100644
--- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
@@ -72,7 +72,7 @@ public class JoinedFlatTable {
 
         ddl.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\177'" + "\n");
         ddl.append("STORED AS SEQUENCEFILE" + "\n");
-        ddl.append("LOCATION '" + storageDfsDir + "/" + intermediateTableDesc.getTableName() + "';").append("\n");
+        ddl.append("LOCATION '" + getTableDir(intermediateTableDesc, storageDfsDir) + "';").append("\n");
         // ddl.append("TBLPROPERTIES ('serialization.null.format'='\\\\N')" +
         // ";\n");
         return ddl.toString();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/738422e1/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index 1dcdc94..4571852 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -21,6 +21,8 @@ package org.apache.kylin.source.hive;
 import java.io.IOException;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hive.hcatalog.data.HCatRecord;
 import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
@@ -134,6 +136,7 @@ public class HiveMRInput implements IMRInput {
             GarbageCollectionStep step = new GarbageCollectionStep();
             step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION);
             step.setIntermediateTableIdentity(getIntermediateTableIdentity());
+            step.setExternalDataPath(JoinedFlatTable.getTableDir(flatHiveTableDesc, JobBuilderSupport.getJobWorkingDir(conf, jobFlow.getId())));
             jobFlow.addTask(step);
         }
 
@@ -148,7 +151,6 @@ public class HiveMRInput implements IMRInput {
     }
 
     public static class GarbageCollectionStep extends AbstractExecutable {
-
         @Override
         protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
             KylinConfig config = context.getConfig();
@@ -161,6 +163,13 @@ public class HiveMRInput implements IMRInput {
                 try {
                     config.getCliCommandExecutor().execute(dropHiveCMD);
                     output.append("Hive table " + hiveTable + " is dropped. \n");
+
+                    Path externalDataPath = new Path(getExternalDataPath());
+                    FileSystem fs = FileSystem.get(externalDataPath.toUri(), HadoopUtil.getCurrentConfiguration());
+                    if (fs.exists(externalDataPath)) {
+                        fs.delete(externalDataPath, true);
+                        output.append("Hive table " + hiveTable + " external data path " + externalDataPath + " is deleted. \n");
+                    }
                 } catch (IOException e) {
                     logger.error("job:" + getId() + " execute finished with exception", e);
                     return new ExecuteResult(ExecuteResult.State.ERROR, e.getMessage());
@@ -177,6 +186,14 @@ public class HiveMRInput implements IMRInput {
         private String getIntermediateTableIdentity() {
             return getParam("oldHiveTable");
         }
+
+        public void setExternalDataPath(String externalDataPath) {
+            setParam("externalDataPath", externalDataPath);
+        }
+
+        private String getExternalDataPath() {
+            return getParam("externalDataPath");
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/738422e1/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput.java
index 8cbb7ff..c634a1d 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput.java
@@ -21,7 +21,6 @@ package org.apache.kylin.storage.hbase.steps;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.IMROutput;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.storage.hbase.steps.HBaseMRSteps;
 
 public class HBaseMROutput implements IMROutput {
 
@@ -37,7 +36,7 @@ public class HBaseMROutput implements IMROutput {
 
             @Override
             public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
-                // nothing to do
+                steps.addCubingGarbageCollectionSteps(jobFlow);
             }
         };
     }
@@ -54,7 +53,7 @@ public class HBaseMROutput implements IMROutput {
 
             @Override
             public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow) {
-                jobFlow.addTask(steps.createMergeGCStep());
+                steps.addMergingGarbageCollectionSteps(jobFlow);
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/738422e1/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
index dfb4f33..f9e9b15 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
@@ -1,5 +1,6 @@
 package org.apache.kylin.storage.hbase.steps;
 
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.kylin.cube.CubeSegment;
@@ -129,6 +130,16 @@ public class HBaseMRSteps extends JobBuilderSupport {
         return mergingHTables;
     }
 
+    public List<String> getMergingHDFSPaths() {
+        final List<CubeSegment> mergingSegments = seg.getCubeInstance().getMergingSegments(seg);
+        Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge");
+        final List<String> mergingHDFSPaths = Lists.newArrayList();
+        for (CubeSegment merging : mergingSegments) {
+            mergingHDFSPaths.add(getJobWorkingDir(merging.getLastBuildJobID()));
+        }
+        return mergingHDFSPaths;
+    }
+
     public String getHFilePath(String jobId) {
         return HadoopUtil.makeQualifiedPathInHBaseCluster(getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/hfile/");
     }
@@ -137,4 +148,43 @@ public class HBaseMRSteps extends JobBuilderSupport {
         return HadoopUtil.makeQualifiedPathInHBaseCluster(getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/rowkey_stats");
     }
 
+    public void addMergingGarbageCollectionSteps(DefaultChainedExecutable jobFlow) {
+        String jobId = jobFlow.getId();
+
+        jobFlow.addTask(createMergeGCStep());
+
+        List<String> toDeletePathsOnHadoopCluster = new ArrayList<>();
+        toDeletePathsOnHadoopCluster.addAll(getMergingHDFSPaths());
+
+        List<String> toDeletePathsOnHbaseCluster = new ArrayList<>();
+        toDeletePathsOnHbaseCluster.add(getRowkeyDistributionOutputPath(jobId));
+        toDeletePathsOnHbaseCluster.add(getHFilePath(jobId));
+
+        HDFSPathGarbageCollectionStep step = new HDFSPathGarbageCollectionStep();
+        step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION);
+        step.setDeletePathsOnHadoopCluster(toDeletePathsOnHadoopCluster);
+        step.setDeletePathsOnHBaseCluster(toDeletePathsOnHbaseCluster);
+        step.setJobId(jobId);
+
+        jobFlow.addTask(step);
+    }
+
+    public void addCubingGarbageCollectionSteps(DefaultChainedExecutable jobFlow) {
+        String jobId = jobFlow.getId();
+
+        List<String> toDeletePathsOnHadoopCluster = new ArrayList<>();
+        toDeletePathsOnHadoopCluster.add(getFactDistinctColumnsPath(jobId));
+
+        List<String> toDeletePathsOnHbaseCluster = new ArrayList<>();
+        toDeletePathsOnHbaseCluster.add(getRowkeyDistributionOutputPath(jobId));
+        toDeletePathsOnHbaseCluster.add(getHFilePath(jobId));
+
+        HDFSPathGarbageCollectionStep step = new HDFSPathGarbageCollectionStep();
+        step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION);
+        step.setDeletePathsOnHadoopCluster(toDeletePathsOnHadoopCluster);
+        step.setDeletePathsOnHBaseCluster(toDeletePathsOnHbaseCluster);
+        step.setJobId(jobId);
+
+        jobFlow.addTask(step);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/738422e1/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java
new file mode 100644
index 0000000..2ae8ca8
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.storage.hbase.steps;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Created by sunyerui on 15/9/17.
+ */
+public class HDFSPathGarbageCollectionStep extends AbstractExecutable {
+
+    private StringBuffer output;
+    private JobEngineConfig config;
+
+    public HDFSPathGarbageCollectionStep() {
+        super();
+        output = new StringBuffer();
+    }
+
+    @Override
+    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+        try {
+            config = new JobEngineConfig(context.getConfig());
+            dropHdfsPathOnCluster(getDeletePathsOnHadoopCluster(), FileSystem.get(HadoopUtil.getCurrentConfiguration()));
+            dropHdfsPathOnCluster(getDeletePathsOnHBaseCluster(), FileSystem.get(HadoopUtil.getCurrentHBaseConfiguration()));
+        } catch (IOException e) {
+            logger.error("job:" + getId() + " execute finished with exception", e);
+            output.append("\n").append(e.getLocalizedMessage());
+            return new ExecuteResult(ExecuteResult.State.ERROR, output.toString());
+        }
+
+        return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());
+    }
+
+    private void dropHdfsPathOnCluster(List<String> oldHdfsPaths, FileSystem fileSystem) throws IOException {
+        if (oldHdfsPaths != null && oldHdfsPaths.size() > 0) {
+            logger.debug("Drop HDFS path on FileSystem: " + fileSystem.getUri());
+            output.append("Drop HDFS path on FileSystem: \"" + fileSystem.getUri() + "\" \n");
+            for (String path : oldHdfsPaths) {
+                if (path.endsWith("*"))
+                    path = path.substring(0, path.length() - 1);
+
+                Path oldPath = new Path(path);
+                if (fileSystem.exists(oldPath)) {
+                    fileSystem.delete(oldPath, true);
+                    logger.debug("HDFS path " + path + " is dropped.");
+                    output.append("HDFS path " + path + " is dropped.\n");
+                } else {
+                    logger.debug("HDFS path " + path + " not exists.");
+                    output.append("HDFS path " + path + " not exists.\n");
+                }
+                // If hbase was deployed on another cluster, the job dir is empty and should be dropped,
+                // because of rowkey_stats and hfile dirs are both dropped.
+                if (fileSystem.listStatus(oldPath.getParent()).length == 0) {
+                    Path emptyJobPath = new Path(JobBuilderSupport.getJobWorkingDir(config, getJobId()));
+                    if (fileSystem.exists(emptyJobPath)) {
+                        fileSystem.delete(emptyJobPath, true);
+                        logger.debug("HDFS path " + emptyJobPath + " is empty and dropped.");
+                        output.append("HDFS path " + emptyJobPath + " is empty and dropped.\n");
+                    }
+                }
+            }
+        }
+    }
+
+    public void setDeletePathsOnHadoopCluster(List<String> deletePaths) {
+        setArrayParam("toDeletePathsOnHadoopCluster", deletePaths);
+    }
+
+    public void setDeletePathsOnHBaseCluster(List<String> deletePaths) {
+        setArrayParam("toDeletePathsOnHBaseCluster", deletePaths);
+    }
+
+    public void setJobId(String jobId) {
+        setParam("jobId", jobId);
+    }
+
+    public List<String> getDeletePathsOnHadoopCluster() {
+        return getArrayParam("toDeletePathsOnHadoopCluster");
+    }
+
+    public List<String> getDeletePathsOnHBaseCluster() {
+        return getArrayParam("toDeletePathsOnHBaseCluster");
+    }
+
+    public String getJobId() {
+        return getParam("jobId");
+    }
+
+    private void setArrayParam(String paramKey, List<String> paramValues) {
+        setParam(paramKey, StringUtils.join(paramValues, ","));
+    }
+
+    private List<String> getArrayParam(String paramKey) {
+        final String ids = getParam(paramKey);
+        if (ids != null) {
+            final String[] splitted = StringUtils.split(ids, ",");
+            ArrayList<String> result = Lists.newArrayListWithExpectedSize(splitted.length);
+            for (String id : splitted) {
+                result.add(id);
+            }
+            return result;
+        } else {
+            return Collections.emptyList();
+        }
+    }
+}