You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/09/27 00:34:48 UTC

[24/50] [abbrv] kylin git commit: KYLIN-1828 StorageCleanupJob

KYLIN-1828 StorageCleanupJob

Signed-off-by: Li Yang <li...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/858fad67
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/858fad67
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/858fad67

Branch: refs/heads/1.5.x-CDH5.7
Commit: 858fad676bb42366c26f4033fcc208675cb4bf72
Parents: fa5c9cb
Author: Cheng Wang <ch...@kyligence.io>
Authored: Fri Sep 23 17:32:24 2016 +0800
Committer: Li Yang <li...@apache.org>
Committed: Fri Sep 23 17:44:23 2016 +0800

----------------------------------------------------------------------
 .../storage/hbase/util/StorageCleanupJob.java   | 57 ++++++++++++++++----
 1 file changed, 48 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/858fad67/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
index 4bd2c53..dffce36 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
@@ -29,6 +29,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.regex.Pattern;
 
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
@@ -51,6 +52,7 @@ import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
 import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.manager.ExecutableManager;
 import org.apache.kylin.metadata.realization.IRealizationConstants;
@@ -61,11 +63,13 @@ public class StorageCleanupJob extends AbstractApplication {
 
     @SuppressWarnings("static-access")
     protected static final Option OPTION_DELETE = OptionBuilder.withArgName("delete").hasArg().isRequired(false).withDescription("Delete the unused storage").create("delete");
+    protected static final Option OPTION_FORCE = OptionBuilder.withArgName("force").hasArg().isRequired(false).withDescription("Warning: will delete any intermediate hive tables").create("force");
 
     protected static final Logger logger = LoggerFactory.getLogger(StorageCleanupJob.class);
     public static final int TIME_THRESHOLD_DELETE_HTABLE = 10; // Unit minute
 
     protected boolean delete = false;
+    protected boolean force = false;
     protected static ExecutableManager executableManager = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv());
 
     private void cleanUnusedHBaseTables(Configuration conf) throws IOException {
@@ -132,6 +136,7 @@ public class StorageCleanupJob extends AbstractApplication {
     protected Options getOptions() {
         Options options = new Options();
         options.addOption(OPTION_DELETE);
+        options.addOption(OPTION_FORCE);
         return options;
     }
 
@@ -139,7 +144,9 @@ public class StorageCleanupJob extends AbstractApplication {
     protected void execute(OptionsHelper optionsHelper) throws Exception {
         logger.info("options: '" + optionsHelper.getOptionsAsString() + "'");
         logger.info("delete option value: '" + optionsHelper.getOptionValue(OPTION_DELETE) + "'");
+        logger.info("force option value: '" + optionsHelper.getOptionValue(OPTION_FORCE) + "'");
         delete = Boolean.parseBoolean(optionsHelper.getOptionValue(OPTION_DELETE));
+        force = Boolean.parseBoolean(optionsHelper.getOptionValue(OPTION_FORCE));
 
         Configuration conf = HBaseConfiguration.create();
 
@@ -183,6 +190,7 @@ public class StorageCleanupJob extends AbstractApplication {
         // GlobFilter filter = new
         // GlobFilter(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory()
         // + "/kylin-.*");
+        // TODO: when first use, /kylin/kylin_metadata does not exist.
         FileStatus[] fStatus = fs.listStatus(new Path(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory()));
         for (FileStatus status : fStatus) {
             String path = status.getPath().getName();
@@ -242,6 +250,8 @@ public class StorageCleanupJob extends AbstractApplication {
         final KylinConfig config = KylinConfig.getInstanceFromEnv();
         final CliCommandExecutor cmdExec = config.getCliCommandExecutor();
         final int uuidLength = 36;
+        final String preFix = "kylin_intermediate_";
+        final String uuidPattern = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}";
 
         final String useDatabaseHql = "USE " + config.getHiveDatabaseForIntermediateTable() + ";";
         final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
@@ -260,7 +270,6 @@ public class StorageCleanupJob extends AbstractApplication {
         for (String jobId : allJobs) {
             // only remove FINISHED and DISCARDED job intermediate table
             final ExecutableState state = executableManager.getOutput(jobId).getState();
-
             if (!state.isFinalState()) {
                 workingJobList.add(jobId);
                 logger.info("Skip intermediate hive table with job id " + jobId + " with job status " + state);
@@ -268,18 +277,35 @@ public class StorageCleanupJob extends AbstractApplication {
         }
 
         while ((line = reader.readLine()) != null) {
-            if (line.startsWith("kylin_intermediate_")) {
-                boolean isNeedDel = false;
+            if (!line.startsWith(preFix))
+                continue;
+
+            if (force == true) {
+                logger.warn("!!!!!!!!!!!!!!!Warning: will delete all intermediate hive tables!!!!!!!!!!!!!!!!!!!!!!");
+                allHiveTablesNeedToBeDeleted.add(line);
+                continue;
+            }
+
+            boolean isNeedDel = true;
+
+            if (line.length() > preFix.length() + uuidLength) {
                 String uuid = line.substring(line.length() - uuidLength, line.length());
                 uuid = uuid.replace("_", "-");
-                //Check whether it's a hive table in use
-                if (allJobs.contains(uuid) && !workingJobList.contains(uuid)) {
-                    isNeedDel = true;
+                final Pattern UUId_PATTERN = Pattern.compile(uuidPattern);
+                if (UUId_PATTERN.matcher(uuid).matches()) {
+                    //Check whether it's a hive table in use
+                    if (isTableInUse(uuid, workingJobList)) {
+                        isNeedDel = false;
+                    }
+                } else {
+                    isNeedDel = false;
                 }
+            } else {
+                isNeedDel = false;
+            }
 
-                if (isNeedDel) {
-                    allHiveTablesNeedToBeDeleted.add(line);
-                }
+            if (isNeedDel) {
+                allHiveTablesNeedToBeDeleted.add(line);
             }
         }
 
@@ -308,6 +334,19 @@ public class StorageCleanupJob extends AbstractApplication {
             reader.close();
     }
 
+    private boolean isTableInUse(String segUuid, List<String> workingJobList) {
+        for (String jobId : workingJobList) {
+            AbstractExecutable abstractExecutable = executableManager.getJob(jobId);
+            String segmentId = abstractExecutable.getParam("segmentId");
+
+            if (null == segmentId)
+                continue;
+
+            return segUuid.equals(segmentId);
+        }
+        return false;
+    }
+
     public static void main(String[] args) throws Exception {
         StorageCleanupJob cli = new StorageCleanupJob();
         cli.execute(args);