You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2016/11/02 03:14:03 UTC

kylin git commit: StorageCleanupJob: delete intermediate tables due to kylin.hive.keep.flat.table=true

Repository: kylin
Updated Branches:
  refs/heads/yang21 220697402 -> 91ffb47fa


StorageCleanupJob: delete intermediate tables due to kylin.hive.keep.flat.table=true


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/91ffb47f
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/91ffb47f
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/91ffb47f

Branch: refs/heads/yang21
Commit: 91ffb47fa8bcb709c0b1e0857dcdb6094b9d3a08
Parents: 2206974
Author: Hongbin Ma <ma...@apache.org>
Authored: Wed Nov 2 11:13:24 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Wed Nov 2 11:14:14 2016 +0800

----------------------------------------------------------------------
 .../apache/kylin/tool/StorageCleanupJob.java    | 61 ++++++++++++++++----
 1 file changed, 49 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/91ffb47f/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java
----------------------------------------------------------------------
diff --git a/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java b/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java
index 56681af..2a2d1f3 100644
--- a/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java
+++ b/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java
@@ -21,6 +21,7 @@ package org.apache.kylin.tool;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -49,6 +50,7 @@ import org.apache.kylin.common.util.OptionsHelper;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.HadoopUtil;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.execution.AbstractExecutable;
@@ -63,6 +65,7 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
 
 public class StorageCleanupJob extends AbstractApplication {
 
@@ -252,6 +255,7 @@ public class StorageCleanupJob extends AbstractApplication {
 
     private void cleanUnusedIntermediateHiveTable(Configuration conf) throws Exception {
         final KylinConfig config = KylinConfig.getInstanceFromEnv();
+        JobEngineConfig engineConfig = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
         final CliCommandExecutor cmdExec = config.getCliCommandExecutor();
         final int uuidLength = 36;
         final String preFix = "kylin_intermediate_";
@@ -269,6 +273,7 @@ public class StorageCleanupJob extends AbstractApplication {
         List<String> allJobs = executableManager.getAllJobIds();
         List<String> allHiveTablesNeedToBeDeleted = new ArrayList<String>();
         List<String> workingJobList = new ArrayList<String>();
+        Map<String, String> segmentId2JobId = Maps.newHashMap();
 
         StringBuilder sb = new StringBuilder();
         for (String jobId : allJobs) {
@@ -278,6 +283,11 @@ public class StorageCleanupJob extends AbstractApplication {
                 workingJobList.add(jobId);
                 sb.append(jobId).append("(").append(state).append("), ");
             }
+
+            String segmentId = getSegmentIdFromJobId(jobId);
+            if (segmentId != null) {//some jobs are not cubing jobs 
+                segmentId2JobId.put(segmentId, jobId);
+            }
         }
         logger.info("Working jobIDs: " + workingJobList);
 
@@ -302,15 +312,15 @@ public class StorageCleanupJob extends AbstractApplication {
                 if (UUId_PATTERN.matcher(uuid).matches()) {
                     //Check whether it's a hive table in use
                     if (isTableInUse(uuid, workingJobList)) {
-                        logger.info("Skip because not isTableInUse");
+                        logger.info("Skip deleting because the table is in use");
                         isNeedDel = false;
                     }
                 } else {
-                    logger.info("Skip because not match pattern");
+                    logger.info("Skip deleting because not match pattern");
                     isNeedDel = false;
                 }
             } else {
-                logger.info("Skip because length not qualified");
+                logger.info("Skip deleting because length not qualified");
                 isNeedDel = false;
             }
 
@@ -320,19 +330,41 @@ public class StorageCleanupJob extends AbstractApplication {
         }
 
         if (delete == true) {
-            final String useDatabaseHql = "USE " + config.getHiveDatabaseForIntermediateTable() + ";";
-            final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
-            hiveCmdBuilder.addStatement(useDatabaseHql);
-            for (String delHive : allHiveTablesNeedToBeDeleted) {
-                hiveCmdBuilder.addStatement("drop table if exists " + delHive + "; ");
-                logger.info("Remove " + delHive + " from hive tables.");
-            }
 
             try {
+                final String useDatabaseHql = "USE " + config.getHiveDatabaseForIntermediateTable() + ";";
+                final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
+                hiveCmdBuilder.addStatement(useDatabaseHql);
+                for (String delHive : allHiveTablesNeedToBeDeleted) {
+                    hiveCmdBuilder.addStatement("drop table if exists " + delHive + "; ");
+                    logger.info("Remove " + delHive + " from hive tables.");
+                }
                 cmdExec.execute(hiveCmdBuilder.build());
+
+                //if kylin.hive.keep.flat.table, some intermediate table might be kept 
+                //delete external path
+                for (String tableToDelete : allHiveTablesNeedToBeDeleted) {
+                    String uuid = tableToDelete.substring(tableToDelete.length() - uuidLength, tableToDelete.length());
+                    String segmentId = uuid.replace("_", "-");
+
+                    if (segmentId2JobId.containsKey(segmentId)) {
+                        String path = JobBuilderSupport.getJobWorkingDir(engineConfig.getHdfsWorkingDirectory(), segmentId2JobId.get(segmentId)) + "/" + tableToDelete;
+                        Path externalDataPath = new Path(path);
+                        FileSystem fs = FileSystem.get(externalDataPath.toUri(), HadoopUtil.getCurrentConfiguration());
+                        if (fs.exists(externalDataPath)) {
+                            fs.delete(externalDataPath, true);
+                            logger.info("Hive table {}'s external path {} deleted", tableToDelete, path);
+                        } else {
+                            logger.info("Hive table {}'s external path {} not exist. It's normal if kylin.hive.keep.flat.table set false (By default)", tableToDelete, path);
+                        }
+                    } else {
+                        logger.warn("Hive table {}'s job ID not found, segmentId2JobId: {}", tableToDelete, segmentId2JobId.toString());
+                    }
+                }
             } catch (IOException e) {
                 e.printStackTrace();
             }
+
         } else {
             System.out.println("------ Intermediate Hive Tables To Be Dropped ------");
             for (String hiveTable : allHiveTablesNeedToBeDeleted) {
@@ -342,10 +374,15 @@ public class StorageCleanupJob extends AbstractApplication {
         }
     }
 
+    private String getSegmentIdFromJobId(String jobId) {
+        AbstractExecutable abstractExecutable = executableManager.getJob(jobId);
+        String segmentId = abstractExecutable.getParam("segmentId");
+        return segmentId;
+    }
+
     private boolean isTableInUse(String segUuid, List<String> workingJobList) {
         for (String jobId : workingJobList) {
-            AbstractExecutable abstractExecutable = executableManager.getJob(jobId);
-            String segmentId = abstractExecutable.getParam("segmentId");
+            String segmentId = getSegmentIdFromJobId(jobId);
 
             if (null == segmentId)
                 continue;