You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2016/11/02 03:14:03 UTC
kylin git commit: StorageCleanupJob: delete intermediate tables due
to kylin.hive.keep.flat.table=true
Repository: kylin
Updated Branches:
refs/heads/yang21 220697402 -> 91ffb47fa
StorageCleanupJob: delete intermediate tables due to kylin.hive.keep.flat.table=true
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/91ffb47f
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/91ffb47f
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/91ffb47f
Branch: refs/heads/yang21
Commit: 91ffb47fa8bcb709c0b1e0857dcdb6094b9d3a08
Parents: 2206974
Author: Hongbin Ma <ma...@apache.org>
Authored: Wed Nov 2 11:13:24 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Wed Nov 2 11:14:14 2016 +0800
----------------------------------------------------------------------
.../apache/kylin/tool/StorageCleanupJob.java | 61 ++++++++++++++++----
1 file changed, 49 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/91ffb47f/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java
----------------------------------------------------------------------
diff --git a/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java b/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java
index 56681af..2a2d1f3 100644
--- a/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java
+++ b/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java
@@ -21,6 +21,7 @@ package org.apache.kylin.tool;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -49,6 +50,7 @@ import org.apache.kylin.common.util.OptionsHelper;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.engine.mr.JobBuilderSupport;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.execution.AbstractExecutable;
@@ -63,6 +65,7 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
public class StorageCleanupJob extends AbstractApplication {
@@ -252,6 +255,7 @@ public class StorageCleanupJob extends AbstractApplication {
private void cleanUnusedIntermediateHiveTable(Configuration conf) throws Exception {
final KylinConfig config = KylinConfig.getInstanceFromEnv();
+ JobEngineConfig engineConfig = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
final CliCommandExecutor cmdExec = config.getCliCommandExecutor();
final int uuidLength = 36;
final String preFix = "kylin_intermediate_";
@@ -269,6 +273,7 @@ public class StorageCleanupJob extends AbstractApplication {
List<String> allJobs = executableManager.getAllJobIds();
List<String> allHiveTablesNeedToBeDeleted = new ArrayList<String>();
List<String> workingJobList = new ArrayList<String>();
+ Map<String, String> segmentId2JobId = Maps.newHashMap();
StringBuilder sb = new StringBuilder();
for (String jobId : allJobs) {
@@ -278,6 +283,11 @@ public class StorageCleanupJob extends AbstractApplication {
workingJobList.add(jobId);
sb.append(jobId).append("(").append(state).append("), ");
}
+
+ String segmentId = getSegmentIdFromJobId(jobId);
+ if (segmentId != null) {//some jobs are not cubing jobs
+ segmentId2JobId.put(segmentId, jobId);
+ }
}
logger.info("Working jobIDs: " + workingJobList);
@@ -302,15 +312,15 @@ public class StorageCleanupJob extends AbstractApplication {
if (UUId_PATTERN.matcher(uuid).matches()) {
//Check whether it's a hive table in use
if (isTableInUse(uuid, workingJobList)) {
- logger.info("Skip because not isTableInUse");
+ logger.info("Skip deleting because the table is in use");
isNeedDel = false;
}
} else {
- logger.info("Skip because not match pattern");
+ logger.info("Skip deleting because not match pattern");
isNeedDel = false;
}
} else {
- logger.info("Skip because length not qualified");
+ logger.info("Skip deleting because length not qualified");
isNeedDel = false;
}
@@ -320,19 +330,41 @@ public class StorageCleanupJob extends AbstractApplication {
}
if (delete == true) {
- final String useDatabaseHql = "USE " + config.getHiveDatabaseForIntermediateTable() + ";";
- final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
- hiveCmdBuilder.addStatement(useDatabaseHql);
- for (String delHive : allHiveTablesNeedToBeDeleted) {
- hiveCmdBuilder.addStatement("drop table if exists " + delHive + "; ");
- logger.info("Remove " + delHive + " from hive tables.");
- }
try {
+ final String useDatabaseHql = "USE " + config.getHiveDatabaseForIntermediateTable() + ";";
+ final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
+ hiveCmdBuilder.addStatement(useDatabaseHql);
+ for (String delHive : allHiveTablesNeedToBeDeleted) {
+ hiveCmdBuilder.addStatement("drop table if exists " + delHive + "; ");
+ logger.info("Remove " + delHive + " from hive tables.");
+ }
cmdExec.execute(hiveCmdBuilder.build());
+
+ //if kylin.hive.keep.flat.table, some intermediate table might be kept
+ //delete external path
+ for (String tableToDelete : allHiveTablesNeedToBeDeleted) {
+ String uuid = tableToDelete.substring(tableToDelete.length() - uuidLength, tableToDelete.length());
+ String segmentId = uuid.replace("_", "-");
+
+ if (segmentId2JobId.containsKey(segmentId)) {
+ String path = JobBuilderSupport.getJobWorkingDir(engineConfig.getHdfsWorkingDirectory(), segmentId2JobId.get(segmentId)) + "/" + tableToDelete;
+ Path externalDataPath = new Path(path);
+ FileSystem fs = FileSystem.get(externalDataPath.toUri(), HadoopUtil.getCurrentConfiguration());
+ if (fs.exists(externalDataPath)) {
+ fs.delete(externalDataPath, true);
+ logger.info("Hive table {}'s external path {} deleted", tableToDelete, path);
+ } else {
+ logger.info("Hive table {}'s external path {} not exist. It's normal if kylin.hive.keep.flat.table set false (By default)", tableToDelete, path);
+ }
+ } else {
+ logger.warn("Hive table {}'s job ID not found, segmentId2JobId: {}", tableToDelete, segmentId2JobId.toString());
+ }
+ }
} catch (IOException e) {
e.printStackTrace();
}
+
} else {
System.out.println("------ Intermediate Hive Tables To Be Dropped ------");
for (String hiveTable : allHiveTablesNeedToBeDeleted) {
@@ -342,10 +374,15 @@ public class StorageCleanupJob extends AbstractApplication {
}
}
+ private String getSegmentIdFromJobId(String jobId) {
+ AbstractExecutable abstractExecutable = executableManager.getJob(jobId);
+ String segmentId = abstractExecutable.getParam("segmentId");
+ return segmentId;
+ }
+
private boolean isTableInUse(String segUuid, List<String> workingJobList) {
for (String jobId : workingJobList) {
- AbstractExecutable abstractExecutable = executableManager.getJob(jobId);
- String segmentId = abstractExecutable.getParam("segmentId");
+ String segmentId = getSegmentIdFromJobId(jobId);
if (null == segmentId)
continue;