You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2016/11/03 05:34:27 UTC

[7/7] kylin git commit: KYLIN-2145 StorageCleanupJob will fail when beeline enabled

KYLIN-2145 StorageCleanupJob will fail when beeline enabled


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/cad3def9
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/cad3def9
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/cad3def9

Branch: refs/heads/master
Commit: cad3def9e21fcfc07767e3a1b72f5b1bf8cae3de
Parents: 7358a78
Author: Hongbin Ma <ma...@apache.org>
Authored: Tue Nov 1 18:32:31 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Thu Nov 3 13:34:12 2016 +0800

----------------------------------------------------------------------
 tool/pom.xml                                    |  4 ++
 .../apache/kylin/tool/StorageCleanupJob.java    | 44 ++++++++++----------
 2 files changed, 25 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/cad3def9/tool/pom.xml
----------------------------------------------------------------------
diff --git a/tool/pom.xml b/tool/pom.xml
index e530469..e3d7bfa 100644
--- a/tool/pom.xml
+++ b/tool/pom.xml
@@ -42,6 +42,10 @@
             <groupId>org.apache.kylin</groupId>
             <artifactId>kylin-source-kafka</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-source-hive</artifactId>
+        </dependency>
 
         <!--Env-->
         <dependency>

http://git-wip-us.apache.org/repos/asf/kylin/blob/cad3def9/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java
----------------------------------------------------------------------
diff --git a/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java b/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java
index c1ff753..56681af 100644
--- a/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java
+++ b/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java
@@ -18,9 +18,7 @@
 
 package org.apache.kylin.tool;
 
-import java.io.BufferedReader;
 import java.io.IOException;
-import java.io.StringReader;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Callable;
@@ -31,6 +29,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.regex.Pattern;
 
+import javax.annotation.Nullable;
+
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
@@ -46,7 +46,6 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.AbstractApplication;
 import org.apache.kylin.common.util.CliCommandExecutor;
 import org.apache.kylin.common.util.OptionsHelper;
-import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
@@ -56,10 +55,15 @@ import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.manager.ExecutableManager;
 import org.apache.kylin.metadata.realization.IRealizationConstants;
-import org.apache.kylin.storage.hbase.util.HiveCmdBuilder;
+import org.apache.kylin.source.hive.HiveClientFactory;
+import org.apache.kylin.source.hive.HiveCmdBuilder;
+import org.apache.kylin.source.hive.IHiveClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+
 public class StorageCleanupJob extends AbstractApplication {
 
     @SuppressWarnings("static-access")
@@ -244,27 +248,24 @@ public class StorageCleanupJob extends AbstractApplication {
             }
             System.out.println("-------------------------------------------------------");
         }
-
     }
 
-    private void cleanUnusedIntermediateHiveTable(Configuration conf) throws IOException {
+    private void cleanUnusedIntermediateHiveTable(Configuration conf) throws Exception {
         final KylinConfig config = KylinConfig.getInstanceFromEnv();
         final CliCommandExecutor cmdExec = config.getCliCommandExecutor();
         final int uuidLength = 36;
         final String preFix = "kylin_intermediate_";
         final String uuidPattern = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}";
 
-        
-        final String useDatabaseHql = "USE " + config.getHiveDatabaseForIntermediateTable() + ";";
-        final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
-        hiveCmdBuilder.addStatement(useDatabaseHql);
-        hiveCmdBuilder.addStatement("show tables " + "\'kylin_intermediate_*\'" + "; ");
-
-        Pair<Integer, String> result = cmdExec.execute(hiveCmdBuilder.build());
+        IHiveClient hiveClient = HiveClientFactory.getHiveClient();
+        List<String> hiveTableNames = hiveClient.getHiveTableNames(config.getHiveDatabaseForIntermediateTable());
+        Iterable<String> kylinIntermediates = Iterables.filter(hiveTableNames, new Predicate<String>() {
+            @Override
+            public boolean apply(@Nullable String input) {
+                return input != null && input.startsWith("kylin_intermediate_");
+            }
+        });
 
-        String outputStr = result.getSecond();
-        BufferedReader reader = new BufferedReader(new StringReader(outputStr));
-        String line = null;
         List<String> allJobs = executableManager.getAllJobIds();
         List<String> allHiveTablesNeedToBeDeleted = new ArrayList<String>();
         List<String> workingJobList = new ArrayList<String>();
@@ -280,15 +281,14 @@ public class StorageCleanupJob extends AbstractApplication {
         }
         logger.info("Working jobIDs: " + workingJobList);
 
-        while ((line = reader.readLine()) != null) {
-
+        for (String line : kylinIntermediates) {
             logger.info("Checking table " + line);
 
             if (!line.startsWith(preFix))
                 continue;
 
             if (force == true) {
-                logger.warn("!!!!!!!!!!!!!!!Warning: will delete all intermediate hive tables!!!!!!!!!!!!!!!!!!!!!!");
+                logger.warn("Warning: will delete all intermediate hive tables!!!!!!!!!!!!!!!!!!!!!!");
                 allHiveTablesNeedToBeDeleted.add(line);
                 continue;
             }
@@ -320,7 +320,8 @@ public class StorageCleanupJob extends AbstractApplication {
         }
 
         if (delete == true) {
-            hiveCmdBuilder.reset();
+            final String useDatabaseHql = "USE " + config.getHiveDatabaseForIntermediateTable() + ";";
+            final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
             hiveCmdBuilder.addStatement(useDatabaseHql);
             for (String delHive : allHiveTablesNeedToBeDeleted) {
                 hiveCmdBuilder.addStatement("drop table if exists " + delHive + "; ");
@@ -339,9 +340,6 @@ public class StorageCleanupJob extends AbstractApplication {
             }
             System.out.println("----------------------------------------------------");
         }
-
-        if (reader != null)
-            reader.close();
     }
 
     private boolean isTableInUse(String segUuid, List<String> workingJobList) {