You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/09/27 00:34:48 UTC
[24/50] [abbrv] kylin git commit: KYLIN-1828 StorageCleanupJob
KYLIN-1828 StorageCleanupJob
Signed-off-by: Li Yang <li...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/858fad67
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/858fad67
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/858fad67
Branch: refs/heads/1.5.x-CDH5.7
Commit: 858fad676bb42366c26f4033fcc208675cb4bf72
Parents: fa5c9cb
Author: Cheng Wang <ch...@kyligence.io>
Authored: Fri Sep 23 17:32:24 2016 +0800
Committer: Li Yang <li...@apache.org>
Committed: Fri Sep 23 17:44:23 2016 +0800
----------------------------------------------------------------------
.../storage/hbase/util/StorageCleanupJob.java | 57 ++++++++++++++++----
1 file changed, 48 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/858fad67/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
index 4bd2c53..dffce36 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
@@ -29,6 +29,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.regex.Pattern;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
@@ -51,6 +52,7 @@ import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.JobBuilderSupport;
import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.manager.ExecutableManager;
import org.apache.kylin.metadata.realization.IRealizationConstants;
@@ -61,11 +63,13 @@ public class StorageCleanupJob extends AbstractApplication {
@SuppressWarnings("static-access")
protected static final Option OPTION_DELETE = OptionBuilder.withArgName("delete").hasArg().isRequired(false).withDescription("Delete the unused storage").create("delete");
+ protected static final Option OPTION_FORCE = OptionBuilder.withArgName("force").hasArg().isRequired(false).withDescription("Warning: will delete any intermediate hive tables").create("force");
protected static final Logger logger = LoggerFactory.getLogger(StorageCleanupJob.class);
public static final int TIME_THRESHOLD_DELETE_HTABLE = 10; // Unit minute
protected boolean delete = false;
+ protected boolean force = false;
protected static ExecutableManager executableManager = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv());
private void cleanUnusedHBaseTables(Configuration conf) throws IOException {
@@ -132,6 +136,7 @@ public class StorageCleanupJob extends AbstractApplication {
protected Options getOptions() {
Options options = new Options();
options.addOption(OPTION_DELETE);
+ options.addOption(OPTION_FORCE);
return options;
}
@@ -139,7 +144,9 @@ public class StorageCleanupJob extends AbstractApplication {
protected void execute(OptionsHelper optionsHelper) throws Exception {
logger.info("options: '" + optionsHelper.getOptionsAsString() + "'");
logger.info("delete option value: '" + optionsHelper.getOptionValue(OPTION_DELETE) + "'");
+ logger.info("force option value: '" + optionsHelper.getOptionValue(OPTION_FORCE) + "'");
delete = Boolean.parseBoolean(optionsHelper.getOptionValue(OPTION_DELETE));
+ force = Boolean.parseBoolean(optionsHelper.getOptionValue(OPTION_FORCE));
Configuration conf = HBaseConfiguration.create();
@@ -183,6 +190,7 @@ public class StorageCleanupJob extends AbstractApplication {
// GlobFilter filter = new
// GlobFilter(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory()
// + "/kylin-.*");
+ // TODO: when first use, /kylin/kylin_metadata does not exist.
FileStatus[] fStatus = fs.listStatus(new Path(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory()));
for (FileStatus status : fStatus) {
String path = status.getPath().getName();
@@ -242,6 +250,8 @@ public class StorageCleanupJob extends AbstractApplication {
final KylinConfig config = KylinConfig.getInstanceFromEnv();
final CliCommandExecutor cmdExec = config.getCliCommandExecutor();
final int uuidLength = 36;
+ final String preFix = "kylin_intermediate_";
+ final String uuidPattern = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}";
final String useDatabaseHql = "USE " + config.getHiveDatabaseForIntermediateTable() + ";";
final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
@@ -260,7 +270,6 @@ public class StorageCleanupJob extends AbstractApplication {
for (String jobId : allJobs) {
// only remove FINISHED and DISCARDED job intermediate table
final ExecutableState state = executableManager.getOutput(jobId).getState();
-
if (!state.isFinalState()) {
workingJobList.add(jobId);
logger.info("Skip intermediate hive table with job id " + jobId + " with job status " + state);
@@ -268,18 +277,35 @@ public class StorageCleanupJob extends AbstractApplication {
}
while ((line = reader.readLine()) != null) {
- if (line.startsWith("kylin_intermediate_")) {
- boolean isNeedDel = false;
+ if (!line.startsWith(preFix))
+ continue;
+
+ if (force == true) {
+ logger.warn("!!!!!!!!!!!!!!!Warning: will delete all intermediate hive tables!!!!!!!!!!!!!!!!!!!!!!");
+ allHiveTablesNeedToBeDeleted.add(line);
+ continue;
+ }
+
+ boolean isNeedDel = true;
+
+ if (line.length() > preFix.length() + uuidLength) {
String uuid = line.substring(line.length() - uuidLength, line.length());
uuid = uuid.replace("_", "-");
- //Check whether it's a hive table in use
- if (allJobs.contains(uuid) && !workingJobList.contains(uuid)) {
- isNeedDel = true;
+ final Pattern UUId_PATTERN = Pattern.compile(uuidPattern);
+ if (UUId_PATTERN.matcher(uuid).matches()) {
+ //Check whether it's a hive table in use
+ if (isTableInUse(uuid, workingJobList)) {
+ isNeedDel = false;
+ }
+ } else {
+ isNeedDel = false;
}
+ } else {
+ isNeedDel = false;
+ }
- if (isNeedDel) {
- allHiveTablesNeedToBeDeleted.add(line);
- }
+ if (isNeedDel) {
+ allHiveTablesNeedToBeDeleted.add(line);
}
}
@@ -308,6 +334,19 @@ public class StorageCleanupJob extends AbstractApplication {
reader.close();
}
+ private boolean isTableInUse(String segUuid, List<String> workingJobList) {
+ for (String jobId : workingJobList) {
+ AbstractExecutable abstractExecutable = executableManager.getJob(jobId);
+ String segmentId = abstractExecutable.getParam("segmentId");
+
+ if (null == segmentId)
+ continue;
+
+ return segUuid.equals(segmentId);
+ }
+ return false;
+ }
+
public static void main(String[] args) throws Exception {
StorageCleanupJob cli = new StorageCleanupJob();
cli.execute(args);