You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2016/11/01 10:32:34 UTC
[2/2] kylin git commit: KYLIN-2145 StorageCleanupJob will fail when
beeline enabled
KYLIN-2145 StorageCleanupJob will fail when beeline enabled
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/6cc70528
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/6cc70528
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/6cc70528
Branch: refs/heads/yang21
Commit: 6cc70528342d295ad9122f3032f050c9c2e80bf8
Parents: d2fde80
Author: Hongbin Ma <ma...@apache.org>
Authored: Tue Nov 1 18:32:31 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Tue Nov 1 18:32:31 2016 +0800
----------------------------------------------------------------------
tool/pom.xml | 4 ++
.../apache/kylin/tool/StorageCleanupJob.java | 44 ++++++++++----------
2 files changed, 25 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/6cc70528/tool/pom.xml
----------------------------------------------------------------------
diff --git a/tool/pom.xml b/tool/pom.xml
index e530469..e3d7bfa 100644
--- a/tool/pom.xml
+++ b/tool/pom.xml
@@ -42,6 +42,10 @@
<groupId>org.apache.kylin</groupId>
<artifactId>kylin-source-kafka</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-source-hive</artifactId>
+ </dependency>
<!--Env-->
<dependency>
http://git-wip-us.apache.org/repos/asf/kylin/blob/6cc70528/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java
----------------------------------------------------------------------
diff --git a/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java b/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java
index c1ff753..56681af 100644
--- a/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java
+++ b/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java
@@ -18,9 +18,7 @@
package org.apache.kylin.tool;
-import java.io.BufferedReader;
import java.io.IOException;
-import java.io.StringReader;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
@@ -31,6 +29,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
@@ -46,7 +46,6 @@ import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.AbstractApplication;
import org.apache.kylin.common.util.CliCommandExecutor;
import org.apache.kylin.common.util.OptionsHelper;
-import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
@@ -56,10 +55,15 @@ import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.manager.ExecutableManager;
import org.apache.kylin.metadata.realization.IRealizationConstants;
-import org.apache.kylin.storage.hbase.util.HiveCmdBuilder;
+import org.apache.kylin.source.hive.HiveClientFactory;
+import org.apache.kylin.source.hive.HiveCmdBuilder;
+import org.apache.kylin.source.hive.IHiveClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+
public class StorageCleanupJob extends AbstractApplication {
@SuppressWarnings("static-access")
@@ -244,27 +248,24 @@ public class StorageCleanupJob extends AbstractApplication {
}
System.out.println("-------------------------------------------------------");
}
-
}
- private void cleanUnusedIntermediateHiveTable(Configuration conf) throws IOException {
+ private void cleanUnusedIntermediateHiveTable(Configuration conf) throws Exception {
final KylinConfig config = KylinConfig.getInstanceFromEnv();
final CliCommandExecutor cmdExec = config.getCliCommandExecutor();
final int uuidLength = 36;
final String preFix = "kylin_intermediate_";
final String uuidPattern = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}";
-
- final String useDatabaseHql = "USE " + config.getHiveDatabaseForIntermediateTable() + ";";
- final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
- hiveCmdBuilder.addStatement(useDatabaseHql);
- hiveCmdBuilder.addStatement("show tables " + "\'kylin_intermediate_*\'" + "; ");
-
- Pair<Integer, String> result = cmdExec.execute(hiveCmdBuilder.build());
+ IHiveClient hiveClient = HiveClientFactory.getHiveClient();
+ List<String> hiveTableNames = hiveClient.getHiveTableNames(config.getHiveDatabaseForIntermediateTable());
+ Iterable<String> kylinIntermediates = Iterables.filter(hiveTableNames, new Predicate<String>() {
+ @Override
+ public boolean apply(@Nullable String input) {
+ return input != null && input.startsWith("kylin_intermediate_");
+ }
+ });
- String outputStr = result.getSecond();
- BufferedReader reader = new BufferedReader(new StringReader(outputStr));
- String line = null;
List<String> allJobs = executableManager.getAllJobIds();
List<String> allHiveTablesNeedToBeDeleted = new ArrayList<String>();
List<String> workingJobList = new ArrayList<String>();
@@ -280,15 +281,14 @@ public class StorageCleanupJob extends AbstractApplication {
}
logger.info("Working jobIDs: " + workingJobList);
- while ((line = reader.readLine()) != null) {
-
+ for (String line : kylinIntermediates) {
logger.info("Checking table " + line);
if (!line.startsWith(preFix))
continue;
if (force == true) {
- logger.warn("!!!!!!!!!!!!!!!Warning: will delete all intermediate hive tables!!!!!!!!!!!!!!!!!!!!!!");
+ logger.warn("Warning: will delete all intermediate hive tables!!!!!!!!!!!!!!!!!!!!!!");
allHiveTablesNeedToBeDeleted.add(line);
continue;
}
@@ -320,7 +320,8 @@ public class StorageCleanupJob extends AbstractApplication {
}
if (delete == true) {
- hiveCmdBuilder.reset();
+ final String useDatabaseHql = "USE " + config.getHiveDatabaseForIntermediateTable() + ";";
+ final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
hiveCmdBuilder.addStatement(useDatabaseHql);
for (String delHive : allHiveTablesNeedToBeDeleted) {
hiveCmdBuilder.addStatement("drop table if exists " + delHive + "; ");
@@ -339,9 +340,6 @@ public class StorageCleanupJob extends AbstractApplication {
}
System.out.println("----------------------------------------------------");
}
-
- if (reader != null)
- reader.close();
}
private boolean isTableInUse(String segUuid, List<String> workingJobList) {