You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/07/24 15:22:05 UTC
incubator-kylin git commit: KYLIN-805 Drop useless Hive intermediate
table and HBase tables in the last step of cube build/merge
Repository: incubator-kylin
Updated Branches:
refs/heads/0.7-staging 059db2685 -> e3d42d3ca
KYLIN-805 Drop useless Hive intermediate table and HBase tables in the last step of cube build/merge
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/e3d42d3c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/e3d42d3c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/e3d42d3c
Branch: refs/heads/0.7-staging
Commit: e3d42d3cac648f7c8cca9ddc6f27c4dc188d9f8f
Parents: 059db26
Author: shaofengshi <sh...@apache.org>
Authored: Fri Jul 24 21:19:29 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Fri Jul 24 21:19:40 2015 +0800
----------------------------------------------------------------------
.../kylin/job/constant/ExecutableConstants.java | 1 +
.../apache/kylin/job/cube/CubingJobBuilder.java | 26 ++-
.../kylin/job/cube/GarbageCollectionStep.java | 199 +++++++++++++++++++
.../job/hadoop/cube/MetadataCleanupJob.java | 4 +-
4 files changed, 226 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e3d42d3c/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index 8ff577c..86a2e40 100644
--- a/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ b/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -55,6 +55,7 @@ public final class ExecutableConstants {
public static final String STEP_NAME_MERGE_DICTIONARY = "Merge Cuboid Dictionary";
public static final String STEP_NAME_MERGE_CUBOID = "Merge Cuboid Data";
public static final String STEP_NAME_UPDATE_CUBE_INFO = "Update Cube Info";
+ public static final String STEP_NAME_GARBAGE_COLLECTION = "Garbage Collection";
public static final String STEP_NAME_BUILD_II = "Build Inverted Index";
public static final String STEP_NAME_CONVERT_II_TO_HFILE = "Convert Inverted Index Data to HFile";
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e3d42d3c/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
index 4fec0b2..74ee876 100644
--- a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
@@ -74,6 +74,10 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
// update cube info
result.addTask(createUpdateCubeInfoAfterBuildStep(seg, intermediateHiveTableStepId, baseCuboidStepId, convertCuboidToHfileStep.getId(), jobId));
+ final CubeJoinedFlatTableDesc intermediateTableDesc = new CubeJoinedFlatTableDesc(seg.getCubeDesc(), seg);
+ final String hiveIntermediateTable = this.getIntermediateHiveTableName(intermediateTableDesc, jobId);
+ result.addTask(createGarbageCollectionStep(seg, null, hiveIntermediateTable, null));
+
return result;
}
@@ -97,12 +101,15 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge");
List<String> mergingSegmentIds = Lists.newArrayList();
List<String> mergingCuboidPaths = Lists.newArrayList();
+ List<String> mergingHTables = Lists.newArrayList();
for (CubeSegment merging : mergingSegments) {
mergingSegmentIds.add(merging.getUuid());
- if (merging.equals(appendSegment))
+ mergingHTables.add(merging.getStorageLocationIdentifier());
+ if (merging.equals(appendSegment)) {
mergingCuboidPaths.add(appendRootPath + "*");
- else
+ } else {
mergingCuboidPaths.add(getPathToMerge(merging));
+ }
}
// merge cuboid
@@ -113,6 +120,7 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
// update cube info
result.addTask(createUpdateCubeInfoAfterMergeStep(mergeSegment, mergingSegmentIds, convertCuboidToHfileStep.getId(), jobId));
+ result.addTask(createGarbageCollectionStep(mergeSegment, mergingHTables, null, mergingCuboidPaths));
return result;
}
@@ -128,9 +136,11 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge");
List<String> mergingSegmentIds = Lists.newArrayList();
List<String> mergingCuboidPaths = Lists.newArrayList();
+ List<String> mergingHTables = Lists.newArrayList();
for (CubeSegment merging : mergingSegments) {
mergingSegmentIds.add(merging.getUuid());
mergingCuboidPaths.add(getPathToMerge(merging));
+ mergingHTables.add(merging.getStorageLocationIdentifier());
}
// merge cuboid
@@ -141,7 +151,7 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
// update cube info
result.addTask(createUpdateCubeInfoAfterMergeStep(seg, mergingSegmentIds, convertCuboidToHfileStep.getId(), jobId));
-
+ result.addTask(createGarbageCollectionStep(seg, mergingHTables, null, mergingCuboidPaths));
return result;
}
@@ -442,4 +452,14 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
return result;
}
+ private GarbageCollectionStep createGarbageCollectionStep(CubeSegment seg, List<String> oldHtables, String hiveIntermediateTable, List<String> oldHdsfPaths) {
+ GarbageCollectionStep result = new GarbageCollectionStep();
+ result.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION);
+ result.setOldHTables(oldHtables);
+ result.setOldHiveTable(hiveIntermediateTable);
+ result.setOldHdsfPaths(oldHdsfPaths);
+ return result;
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e3d42d3c/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java b/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java
new file mode 100644
index 0000000..f95fc45
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.cube;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.job.cmd.ShellCmdOutput;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.metadata.realization.IRealizationConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Drop the resources that is no longer needed, including intermediate hive table (after cube build) and hbase tables (after cube merge)
+ */
+public class GarbageCollectionStep extends AbstractExecutable {
+
+ private static final String OLD_HTABLES = "oldHTables";
+
+ private static final String OLD_HIVE_TABLE = "oldHiveTable";
+
+ private static final String OLD_HDFS_PATHS = "oldHdfsPaths";
+
+ private static final Logger logger = LoggerFactory.getLogger(GarbageCollectionStep.class);
+
+ private StringBuffer output;
+
+ public GarbageCollectionStep() {
+ super();
+ output = new StringBuffer();
+ }
+
+ @Override
+ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+
+ try {
+ dropHBaseTable(context);
+ dropHdfsPath(context);
+ dropHiveTable(context);
+ } catch (IOException e) {
+ logger.error("job:" + getId() + " execute finished with exception", e);
+ output.append("\n").append(e.getLocalizedMessage());
+ return new ExecuteResult(ExecuteResult.State.ERROR, output.toString());
+ }
+
+ return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());
+ }
+
+ private void dropHiveTable(ExecutableContext context) throws IOException {
+ final String hiveTable = this.getOldHiveTable();
+ if (StringUtils.isNotEmpty(hiveTable)) {
+ final String dropSQL = "USE " + KylinConfig.getInstanceFromEnv().getHiveDatabaseForIntermediateTable() + ";"
+ + " DROP TABLE IF EXISTS " + hiveTable + ";";
+ final String dropHiveCMD = "hive -e \"" + dropSQL + "\"";
+ ShellCmdOutput shellCmdOutput = new ShellCmdOutput();
+ context.getConfig().getCliCommandExecutor().execute(dropHiveCMD, shellCmdOutput);
+ output.append("Hive table " + hiveTable + " is dropped. \n");
+ }
+
+ }
+
+ private void dropHBaseTable(ExecutableContext context) throws IOException {
+ List<String> oldTables = getOldHTables();
+ if (oldTables != null && oldTables.size() > 0) {
+ String metadataUrlPrefix = KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix();
+ Configuration conf = HBaseConfiguration.create();
+ HBaseAdmin admin = null;
+ try {
+ admin = new HBaseAdmin(conf);
+ for (String table : oldTables) {
+ if (admin.tableExists(table)) {
+ HTableDescriptor tableDescriptor = admin.getTableDescriptor(Bytes.toBytes(table));
+ String host = tableDescriptor.getValue(IRealizationConstants.HTableTag);
+ if (metadataUrlPrefix.equalsIgnoreCase(host)) {
+ if (admin.isTableEnabled(table)) {
+ admin.disableTable(table);
+ }
+ admin.deleteTable(table);
+ logger.debug("Dropped htable: " + table);
+ output.append("HBase table " + table + " is dropped. \n");
+ } else {
+ logger.debug("Skip htable: " + table);
+ output.append("Skip htable: " + table + ". \n");
+ }
+ }
+ }
+
+ } finally {
+ if (admin != null)
+ try {
+ admin.close();
+ } catch (IOException e) {
+ logger.error(e.getLocalizedMessage());
+ }
+ }
+ }
+ }
+
+ private void dropHdfsPath(ExecutableContext context) throws IOException {
+
+ List<String> oldHdfsPaths = this.getOldHdsfPaths();
+ if (oldHdfsPaths != null && oldHdfsPaths.size() > 0) {
+ Configuration hconf = HadoopUtil.getCurrentConfiguration();
+ FileSystem fileSystem = FileSystem.get(hconf);
+ for (String path : oldHdfsPaths) {
+ if (path.endsWith("*"))
+ path = path.substring(0, path.length() - 1);
+
+ Path oldPath = new Path(path);
+ if (fileSystem.exists(oldPath)) {
+ fileSystem.delete(oldPath, true);
+ logger.debug("Deleted path: " + path);
+ output.append("Deleted path: " + path + " \n");
+ } else {
+ logger.debug("Path not exists: " + path);
+ output.append("Path not exists: " + path + " \n");
+ }
+ }
+
+ }
+ }
+
+ public void setOldHTables(List<String> tables) {
+ setArrayParam(OLD_HTABLES, tables);
+ }
+
+ private List<String> getOldHTables() {
+ return getArrayParam(OLD_HTABLES);
+ }
+
+
+ public void setOldHdsfPaths(List<String> paths) {
+ setArrayParam(OLD_HDFS_PATHS, paths);
+ }
+
+ private List<String> getOldHdsfPaths() {
+ return getArrayParam(OLD_HDFS_PATHS);
+ }
+
+ private void setArrayParam(String paramKey, List<String> paramValues) {
+ setParam(paramKey, StringUtils.join(paramValues, ","));
+ }
+
+ private List<String> getArrayParam(String paramKey) {
+ final String ids = getParam(paramKey);
+ if (ids != null) {
+ final String[] splitted = StringUtils.split(ids, ",");
+ ArrayList<String> result = Lists.newArrayListWithExpectedSize(splitted.length);
+ for (String id : splitted) {
+ result.add(id);
+ }
+ return result;
+ } else {
+ return Collections.emptyList();
+ }
+ }
+
+ public void setOldHiveTable(String hiveTable) {
+ setParam(OLD_HIVE_TABLE, hiveTable);
+ }
+
+ private String getOldHiveTable() {
+ return getParam(OLD_HIVE_TABLE);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e3d42d3c/job/src/main/java/org/apache/kylin/job/hadoop/cube/MetadataCleanupJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/MetadataCleanupJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/MetadataCleanupJob.java
index 1d899bb..397fe84 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/MetadataCleanupJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/MetadataCleanupJob.java
@@ -19,6 +19,7 @@
package org.apache.kylin.job.hadoop.cube;
import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
@@ -33,6 +34,7 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Set;
public class MetadataCleanupJob extends AbstractHadoopJob {
@@ -91,7 +93,7 @@ public class MetadataCleanupJob extends AbstractHadoopJob {
public void cleanup() throws Exception {
CubeManager cubeManager = CubeManager.getInstance(config);
- List<String> activeResourceList = Lists.newArrayList();
+ Set<String> activeResourceList = Sets.newHashSet();
for (org.apache.kylin.cube.CubeInstance cube : cubeManager.listAllCubes()) {
for (org.apache.kylin.cube.CubeSegment segment : cube.getSegments()) {
activeResourceList.addAll(segment.getSnapshotPaths());