You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/06/24 07:05:01 UTC
[23/50] kylin git commit: KYLIN-1805: Fix got stuck when deleting
HTables during running the StorageCleanupJob
KYLIN-1805: Fix got stuck when deleting HTables during running the StorageCleanupJob
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/c4731de6
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/c4731de6
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/c4731de6
Branch: refs/heads/stream_m1
Commit: c4731de61faf3806cdba8a312d3ef66feaece14d
Parents: 76d60b6
Author: kyotoYaho <nj...@apache.org>
Authored: Mon Jun 20 12:48:58 2016 +0800
Committer: Zhong <ya...@lm-shc-16501214.corp.ebay.com>
Committed: Mon Jun 20 12:49:43 2016 +0800
----------------------------------------------------------------------
.../storage/hbase/util/StorageCleanupJob.java | 52 +++++++++++++++-----
1 file changed, 41 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/c4731de6/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
index a5db52f..ac35ccf 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
@@ -24,6 +24,7 @@ import java.io.StringReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.*;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
@@ -62,6 +63,8 @@ public class StorageCleanupJob extends AbstractHadoopJob {
protected static final Logger logger = LoggerFactory.getLogger(StorageCleanupJob.class);
+ public static final int TIME_THRESHOLD_DELETE_HTABLE = 10; // Unit minute
+
boolean delete = false;
protected static ExecutableManager executableManager = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv());
@@ -144,19 +147,21 @@ public class StorageCleanupJob extends AbstractHadoopJob {
if (delete == true) {
// drop tables
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
for (String htableName : allTablesNeedToBeDropped) {
- logger.info("Deleting HBase table " + htableName);
- if (hbaseAdmin.tableExists(htableName)) {
- if (hbaseAdmin.isTableEnabled(htableName)) {
- hbaseAdmin.disableTable(htableName);
- }
-
- hbaseAdmin.deleteTable(htableName);
- logger.info("Deleted HBase table " + htableName);
- } else {
- logger.info("HBase table" + htableName + " does not exist");
+ FutureTask futureTask = new FutureTask(new DeleteHTableRunnable(hbaseAdmin, htableName));
+ executorService.execute(futureTask);
+ try {
+ futureTask.get(TIME_THRESHOLD_DELETE_HTABLE, TimeUnit.MINUTES);
+ } catch (TimeoutException e) {
+ logger.warn("It fails to delete htable " + htableName + ", for it cost more than " + TIME_THRESHOLD_DELETE_HTABLE + " minutes!");
+ futureTask.cancel(true);
+ } catch (Exception e) {
+ e.printStackTrace();
+ futureTask.cancel(true);
}
}
+ executorService.shutdown();
} else {
System.out.println("--------------- Tables To Be Dropped ---------------");
for (String htableName : allTablesNeedToBeDropped) {
@@ -168,6 +173,31 @@ public class StorageCleanupJob extends AbstractHadoopJob {
hbaseAdmin.close();
}
+ class DeleteHTableRunnable implements Callable {
+ HBaseAdmin hbaseAdmin;
+ String htableName;
+
+ DeleteHTableRunnable(HBaseAdmin hbaseAdmin, String htableName) {
+ this.hbaseAdmin = hbaseAdmin;
+ this.htableName = htableName;
+ }
+
+ public Object call() throws Exception {
+ logger.info("Deleting HBase table " + htableName);
+ if (hbaseAdmin.tableExists(htableName)) {
+ if (hbaseAdmin.isTableEnabled(htableName)) {
+ hbaseAdmin.disableTable(htableName);
+ }
+
+ hbaseAdmin.deleteTable(htableName);
+ logger.info("Deleted HBase table " + htableName);
+ } else {
+ logger.info("HBase table" + htableName + " does not exist");
+ }
+ return null;
+ }
+ }
+
private void cleanUnusedHdfsFiles(Configuration conf) throws IOException {
JobEngineConfig engineConfig = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
@@ -236,7 +266,7 @@ public class StorageCleanupJob extends AbstractHadoopJob {
final KylinConfig config = KylinConfig.getInstanceFromEnv();
final CliCommandExecutor cmdExec = config.getCliCommandExecutor();
final int uuidLength = 36;
-
+
final String useDatabaseHql = "USE " + config.getHiveDatabaseForIntermediateTable() + ";";
final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
hiveCmdBuilder.addStatement(useDatabaseHql);