You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/07/24 15:22:05 UTC

incubator-kylin git commit: KYLIN-805 Drop useless Hive intermediate table and HBase tables in the last step of cube build/merge

Repository: incubator-kylin
Updated Branches:
  refs/heads/0.7-staging 059db2685 -> e3d42d3ca


KYLIN-805 Drop useless Hive intermediate table and HBase tables in the last step of cube build/merge

Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/e3d42d3c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/e3d42d3c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/e3d42d3c

Branch: refs/heads/0.7-staging
Commit: e3d42d3cac648f7c8cca9ddc6f27c4dc188d9f8f
Parents: 059db26
Author: shaofengshi <sh...@apache.org>
Authored: Fri Jul 24 21:19:29 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Fri Jul 24 21:19:40 2015 +0800

----------------------------------------------------------------------
 .../kylin/job/constant/ExecutableConstants.java |   1 +
 .../apache/kylin/job/cube/CubingJobBuilder.java |  26 ++-
 .../kylin/job/cube/GarbageCollectionStep.java   | 199 +++++++++++++++++++
 .../job/hadoop/cube/MetadataCleanupJob.java     |   4 +-
 4 files changed, 226 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e3d42d3c/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index 8ff577c..86a2e40 100644
--- a/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ b/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -55,6 +55,7 @@ public final class ExecutableConstants {
     public static final String STEP_NAME_MERGE_DICTIONARY = "Merge Cuboid Dictionary";
     public static final String STEP_NAME_MERGE_CUBOID = "Merge Cuboid Data";
     public static final String STEP_NAME_UPDATE_CUBE_INFO = "Update Cube Info";
+    public static final String STEP_NAME_GARBAGE_COLLECTION = "Garbage Collection";
     
     public static final String STEP_NAME_BUILD_II = "Build Inverted Index";
     public static final String STEP_NAME_CONVERT_II_TO_HFILE = "Convert Inverted Index Data to HFile";

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e3d42d3c/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
index 4fec0b2..74ee876 100644
--- a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
@@ -74,6 +74,10 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
         // update cube info
         result.addTask(createUpdateCubeInfoAfterBuildStep(seg, intermediateHiveTableStepId, baseCuboidStepId, convertCuboidToHfileStep.getId(), jobId));
 
+        final CubeJoinedFlatTableDesc intermediateTableDesc = new CubeJoinedFlatTableDesc(seg.getCubeDesc(), seg);
+        final String hiveIntermediateTable = this.getIntermediateHiveTableName(intermediateTableDesc, jobId);
+        result.addTask(createGarbageCollectionStep(seg, null, hiveIntermediateTable, null));
+        
         return result;
     }
 
@@ -97,12 +101,15 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
         Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge");
         List<String> mergingSegmentIds = Lists.newArrayList();
         List<String> mergingCuboidPaths = Lists.newArrayList();
+        List<String> mergingHTables = Lists.newArrayList();
         for (CubeSegment merging : mergingSegments) {
             mergingSegmentIds.add(merging.getUuid());
-            if (merging.equals(appendSegment))
+            mergingHTables.add(merging.getStorageLocationIdentifier());
+            if (merging.equals(appendSegment)) {
                 mergingCuboidPaths.add(appendRootPath + "*");
-            else
+            } else {
                 mergingCuboidPaths.add(getPathToMerge(merging));
+            }
         }
 
         // merge cuboid
@@ -113,6 +120,7 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
 
         // update cube info
         result.addTask(createUpdateCubeInfoAfterMergeStep(mergeSegment, mergingSegmentIds, convertCuboidToHfileStep.getId(), jobId));
+        result.addTask(createGarbageCollectionStep(mergeSegment, mergingHTables, null, mergingCuboidPaths));
         
         return result;
     }
@@ -128,9 +136,11 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
         Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge");
         List<String> mergingSegmentIds = Lists.newArrayList();
         List<String> mergingCuboidPaths = Lists.newArrayList();
+        List<String> mergingHTables = Lists.newArrayList();
         for (CubeSegment merging : mergingSegments) {
             mergingSegmentIds.add(merging.getUuid());
             mergingCuboidPaths.add(getPathToMerge(merging));
+            mergingHTables.add(merging.getStorageLocationIdentifier());
         }
 
         // merge cuboid
@@ -141,7 +151,7 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
 
         // update cube info
         result.addTask(createUpdateCubeInfoAfterMergeStep(seg, mergingSegmentIds, convertCuboidToHfileStep.getId(), jobId));
-
+        result.addTask(createGarbageCollectionStep(seg, mergingHTables, null, mergingCuboidPaths));
         return result;
     }
 
@@ -442,4 +452,14 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
         return result;
     }
 
+    private GarbageCollectionStep createGarbageCollectionStep(CubeSegment seg, List<String> oldHtables, String hiveIntermediateTable, List<String> oldHdsfPaths) {
+        GarbageCollectionStep result = new GarbageCollectionStep();
+        result.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION);
+        result.setOldHTables(oldHtables);
+        result.setOldHiveTable(hiveIntermediateTable);
+        result.setOldHdsfPaths(oldHdsfPaths);
+        return result;
+    }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e3d42d3c/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java b/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java
new file mode 100644
index 0000000..f95fc45
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.cube;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.job.cmd.ShellCmdOutput;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.metadata.realization.IRealizationConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Drop the resources that is no longer needed, including intermediate hive table (after cube build) and hbase tables (after cube merge)
+ */
+public class GarbageCollectionStep extends AbstractExecutable {
+
+    private static final String OLD_HTABLES = "oldHTables";
+
+    private static final String OLD_HIVE_TABLE = "oldHiveTable";
+
+    private static final String OLD_HDFS_PATHS = "oldHdfsPaths";
+
+    private static final Logger logger = LoggerFactory.getLogger(GarbageCollectionStep.class);
+
+    private StringBuffer output;
+
+    public GarbageCollectionStep() {
+        super();
+        output = new StringBuffer();
+    }
+
+    @Override
+    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+
+        try {
+            dropHBaseTable(context);
+            dropHdfsPath(context);
+            dropHiveTable(context);
+        } catch (IOException e) {
+            logger.error("job:" + getId() + " execute finished with exception", e);
+            output.append("\n").append(e.getLocalizedMessage());
+            return new ExecuteResult(ExecuteResult.State.ERROR, output.toString());
+        }
+
+        return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());
+    }
+
+    private void dropHiveTable(ExecutableContext context) throws IOException {
+        final String hiveTable = this.getOldHiveTable();
+        if (StringUtils.isNotEmpty(hiveTable)) {
+            final String dropSQL = "USE " + KylinConfig.getInstanceFromEnv().getHiveDatabaseForIntermediateTable() + ";"
+                    + " DROP TABLE IF EXISTS  " + hiveTable + ";";
+            final String dropHiveCMD = "hive -e \"" + dropSQL + "\"";
+            ShellCmdOutput shellCmdOutput = new ShellCmdOutput();
+            context.getConfig().getCliCommandExecutor().execute(dropHiveCMD, shellCmdOutput);
+            output.append("Hive table " + hiveTable + " is dropped. \n");
+        }
+
+    }
+
+    private void dropHBaseTable(ExecutableContext context) throws IOException {
+        List<String> oldTables = getOldHTables();
+        if (oldTables != null && oldTables.size() > 0) {
+            String metadataUrlPrefix = KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix();
+            Configuration conf = HBaseConfiguration.create();
+            HBaseAdmin admin = null;
+            try {
+                admin = new HBaseAdmin(conf);
+                for (String table : oldTables) {
+                    if (admin.tableExists(table)) {
+                        HTableDescriptor tableDescriptor = admin.getTableDescriptor(Bytes.toBytes(table));
+                        String host = tableDescriptor.getValue(IRealizationConstants.HTableTag);
+                        if (metadataUrlPrefix.equalsIgnoreCase(host)) {
+                            if (admin.isTableEnabled(table)) {
+                                admin.disableTable(table);
+                            }
+                            admin.deleteTable(table);
+                            logger.debug("Dropped htable: " + table);
+                            output.append("HBase table " + table + " is dropped. \n");
+                        } else {
+                            logger.debug("Skip htable: " + table);
+                            output.append("Skip htable: " + table + ". \n");
+                        }
+                    }
+                }
+
+            } finally {
+                if (admin != null)
+                    try {
+                        admin.close();
+                    } catch (IOException e) {
+                        logger.error(e.getLocalizedMessage());
+                    }
+            }
+        }
+    }
+
+    private void dropHdfsPath(ExecutableContext context) throws IOException {
+
+        List<String> oldHdfsPaths = this.getOldHdsfPaths();
+        if (oldHdfsPaths != null && oldHdfsPaths.size() > 0) {
+            Configuration hconf = HadoopUtil.getCurrentConfiguration();
+            FileSystem fileSystem = FileSystem.get(hconf);
+            for (String path : oldHdfsPaths) {
+                if (path.endsWith("*"))
+                    path = path.substring(0, path.length() - 1);
+                
+                Path oldPath = new Path(path);
+                if (fileSystem.exists(oldPath)) {
+                    fileSystem.delete(oldPath, true);
+                    logger.debug("Deleted path: " + path);
+                    output.append("Deleted path:  " + path + " \n");
+                } else {
+                    logger.debug("Path not exists: " + path);
+                    output.append("Path not exists: " + path + " \n");
+                }
+            }
+
+        }
+    }
+
+    public void setOldHTables(List<String> tables) {
+        setArrayParam(OLD_HTABLES, tables);
+    }
+
+    private List<String> getOldHTables() {
+        return getArrayParam(OLD_HTABLES);
+    }
+
+
+    public void setOldHdsfPaths(List<String> paths) {
+        setArrayParam(OLD_HDFS_PATHS, paths);
+    }
+
+    private List<String> getOldHdsfPaths() {
+        return getArrayParam(OLD_HDFS_PATHS);
+    }
+
+    private void setArrayParam(String paramKey, List<String> paramValues) {
+        setParam(paramKey, StringUtils.join(paramValues, ","));
+    }
+
+    private List<String> getArrayParam(String paramKey) {
+        final String ids = getParam(paramKey);
+        if (ids != null) {
+            final String[] splitted = StringUtils.split(ids, ",");
+            ArrayList<String> result = Lists.newArrayListWithExpectedSize(splitted.length);
+            for (String id : splitted) {
+                result.add(id);
+            }
+            return result;
+        } else {
+            return Collections.emptyList();
+        }
+    }
+
+    public void setOldHiveTable(String hiveTable) {
+        setParam(OLD_HIVE_TABLE, hiveTable);
+    }
+
+    private String getOldHiveTable() {
+        return getParam(OLD_HIVE_TABLE);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e3d42d3c/job/src/main/java/org/apache/kylin/job/hadoop/cube/MetadataCleanupJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/MetadataCleanupJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/MetadataCleanupJob.java
index 1d899bb..397fe84 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/MetadataCleanupJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/MetadataCleanupJob.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.job.hadoop.cube;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
@@ -33,6 +34,7 @@ import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Set;
 
 public class MetadataCleanupJob extends AbstractHadoopJob {
 
@@ -91,7 +93,7 @@ public class MetadataCleanupJob extends AbstractHadoopJob {
     public void cleanup() throws Exception {
         CubeManager cubeManager = CubeManager.getInstance(config);
 
-        List<String> activeResourceList = Lists.newArrayList();
+        Set<String> activeResourceList = Sets.newHashSet();
         for (org.apache.kylin.cube.CubeInstance cube : cubeManager.listAllCubes()) {
             for (org.apache.kylin.cube.CubeSegment segment : cube.getSegments()) {
                 activeResourceList.addAll(segment.getSnapshotPaths());