You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by nj...@apache.org on 2016/06/17 17:50:39 UTC

kylin git commit: KYLIN-1805: Fix got stuck when deleting HTables during running the StorageCleanupJob

Repository: kylin
Updated Branches:
  refs/heads/1.4-rc d138d174d -> 922c9c41f


KYLIN-1805: Fix got stuck when deleting HTables during running the StorageCleanupJob


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/922c9c41
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/922c9c41
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/922c9c41

Branch: refs/heads/1.4-rc
Commit: 922c9c41fe6fe1a26f194addefba8ac0ea0d68fa
Parents: d138d17
Author: kyotoYaho <nj...@apache.org>
Authored: Fri Jun 17 17:14:09 2016 +0800
Committer: kyotoYaho <nj...@apache.org>
Committed: Fri Jun 17 17:14:09 2016 +0800

----------------------------------------------------------------------
 .../storage/hbase/util/StorageCleanupJob.java   | 56 +++++++++++++++-----
 1 file changed, 43 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/922c9c41/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
index 2137f57..f5daf02 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
@@ -24,6 +24,7 @@ import java.io.StringReader;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.*;
 
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
@@ -64,6 +65,8 @@ public class StorageCleanupJob extends AbstractHadoopJob {
 
     public static final long TIME_THREADSHOLD = 2 * 24 * 3600 * 1000l; // 2 days
 
+    public static final int TIME_THRESHOLD_DELETE_HTABLE = 10; // Unit minute
+
     boolean delete = false;
 
     protected static ExecutableManager executableManager = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv());
@@ -146,19 +149,21 @@ public class StorageCleanupJob extends AbstractHadoopJob {
 
         if (delete == true) {
             // drop tables
+            ExecutorService executorService = Executors.newSingleThreadExecutor();
             for (String htableName : allTablesNeedToBeDropped) {
-                logger.info("Deleting HBase table " + htableName);
-                if (hbaseAdmin.tableExists(htableName)) {
-                    if (hbaseAdmin.isTableEnabled(htableName)) {
-                        hbaseAdmin.disableTable(htableName);
-                    }
-
-                    hbaseAdmin.deleteTable(htableName);
-                    logger.info("Deleted HBase table " + htableName);
-                } else {
-                    logger.info("HBase table" + htableName + " does not exist");
+                FutureTask futureTask = new FutureTask(new DeleteHTableRunnable(hbaseAdmin, htableName));
+                executorService.execute(futureTask);
+                try {
+                    futureTask.get(TIME_THRESHOLD_DELETE_HTABLE, TimeUnit.MINUTES);
+                } catch (TimeoutException e) {
+                    logger.warn("It fails to delete htable " + htableName + ", for it cost more than " + TIME_THRESHOLD_DELETE_HTABLE + " minutes!");
+                    futureTask.cancel(true);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    futureTask.cancel(true);
                 }
             }
+            executorService.shutdown();
         } else {
             System.out.println("--------------- Tables To Be Dropped ---------------");
             for (String htableName : allTablesNeedToBeDropped) {
@@ -170,6 +175,31 @@ public class StorageCleanupJob extends AbstractHadoopJob {
         hbaseAdmin.close();
     }
 
+    class DeleteHTableRunnable implements Callable {
+        HBaseAdmin hbaseAdmin;
+        String htableName;
+
+        DeleteHTableRunnable(HBaseAdmin hbaseAdmin, String htableName) {
+            this.hbaseAdmin = hbaseAdmin;
+            this.htableName = htableName;
+        }
+
+        public Object call() throws Exception {
+            logger.info("Deleting HBase table " + htableName);
+            if (hbaseAdmin.tableExists(htableName)) {
+                if (hbaseAdmin.isTableEnabled(htableName)) {
+                    hbaseAdmin.disableTable(htableName);
+                }
+
+                hbaseAdmin.deleteTable(htableName);
+                logger.info("Deleted HBase table " + htableName);
+            } else {
+                logger.info("HBase table" + htableName + " does not exist");
+            }
+            return null;
+        }
+    }
+
     private void cleanUnusedHdfsFiles(Configuration conf) throws IOException {
         JobEngineConfig engineConfig = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
         CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
@@ -238,14 +268,14 @@ public class StorageCleanupJob extends AbstractHadoopJob {
         final KylinConfig config = KylinConfig.getInstanceFromEnv();
         final CliCommandExecutor cmdExec = config.getCliCommandExecutor();
         final int uuidLength = 36;
-        
+
         final String useDatabaseHql = "USE " + config.getHiveDatabaseForIntermediateTable() + ";";
         StringBuilder buf = new StringBuilder();
         buf.append("hive -e \"");
         buf.append(useDatabaseHql);
         buf.append("show tables " + "\'kylin_intermediate_*\'" + "; ");
         buf.append("\"");
-        
+
         Pair<Integer, String> result = cmdExec.execute(buf.toString());
 
         String outputStr = result.getSecond();
@@ -290,7 +320,7 @@ public class StorageCleanupJob extends AbstractHadoopJob {
                 logger.info("Remove " + delHive + " from hive tables.");
             }
             buf.append("\"");
-            
+
             try {
                 cmdExec.execute(buf.toString());
             } catch (IOException e) {