You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/10/20 23:23:03 UTC
[39/50] [abbrv] kylin git commit: KYLIN-2089 Make update HBase
coprocessor concurrent
KYLIN-2089 Make update HBase coprocessor concurrent
Signed-off-by: Li Yang <li...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/b7e8065c
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/b7e8065c
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/b7e8065c
Branch: refs/heads/KYLIN-1971
Commit: b7e8065c0b44eb45ec11fd3b498fd72652782b84
Parents: eef157c
Author: kangkaisen <ka...@live.com>
Authored: Wed Oct 12 20:12:15 2016 +0800
Committer: Li Yang <li...@apache.org>
Committed: Thu Oct 20 16:43:51 2016 +0800
----------------------------------------------------------------------
.../hbase/util/DeployCoprocessorCLI.java | 55 ++++++++++++++++----
1 file changed, 46 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/b7e8065c/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
index a1193e7..cc9b988 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
@@ -23,10 +23,14 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.regex.Matcher;
import org.apache.commons.io.IOUtils;
@@ -107,14 +111,15 @@ public class DeployCoprocessorCLI {
Path hdfsCoprocessorJar = uploadCoprocessorJar(localCoprocessorJar, fileSystem, oldJarPaths);
logger.info("New coprocessor jar: " + hdfsCoprocessorJar);
- List<String> processedTables = resetCoprocessorOnHTables(hbaseAdmin, hdfsCoprocessorJar, tableNames);
+ resetCoprocessorOnHTables(hbaseAdmin, hdfsCoprocessorJar, tableNames);
// Don't remove old jars, missing coprocessor jar will fail hbase
// removeOldJars(oldJarPaths, fileSystem);
hbaseAdmin.close();
- logger.info("Processed " + processedTables);
+ logger.info("Processed tables count: " + processedTables.size());
+ logger.info("Processed tables: " + processedTables);
logger.info("Active coprocessor jar: " + hdfsCoprocessorJar);
}
@@ -220,18 +225,50 @@ public class DeployCoprocessorCLI {
hbaseAdmin.enableTable(tableName);
}
- private static List<String> resetCoprocessorOnHTables(HBaseAdmin hbaseAdmin, Path hdfsCoprocessorJar, List<String> tableNames) throws IOException {
- List<String> processed = new ArrayList<String>();
-
- for (String tableName : tableNames) {
+ private static List<String> processedTables = Collections.synchronizedList(new ArrayList<String>());
+
+ private static void resetCoprocessorOnHTables(final HBaseAdmin hbaseAdmin, final Path hdfsCoprocessorJar, List<String> tableNames) throws IOException {
+ ExecutorService coprocessorPool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
+ CountDownLatch countDownLatch = new CountDownLatch(tableNames.size());
+
+ for (final String tableName : tableNames) {
+ coprocessorPool.execute(new ResetCoprocessorWorker(countDownLatch, hbaseAdmin, hdfsCoprocessorJar, tableName));
+ }
+
+ try {
+ countDownLatch.await();
+ } catch (InterruptedException e) {
+ logger.error("reset coprocessor failed: ", e);
+ }
+
+ coprocessorPool.shutdown();
+ }
+
+ private static class ResetCoprocessorWorker implements Runnable {
+ private final CountDownLatch countDownLatch;
+ private final HBaseAdmin hbaseAdmin;
+ private final Path hdfsCoprocessorJar;
+ private final String tableName;
+
+ public ResetCoprocessorWorker(CountDownLatch countDownLatch, HBaseAdmin hbaseAdmin, Path hdfsCoprocessorJar, String tableName) {
+ this.countDownLatch = countDownLatch;
+ this.hbaseAdmin = hbaseAdmin;
+ this.hdfsCoprocessorJar = hdfsCoprocessorJar;
+ this.tableName = tableName;
+ }
+
+ @Override
+ public void run() {
try {
resetCoprocessor(tableName, hbaseAdmin, hdfsCoprocessorJar);
- processed.add(tableName);
- } catch (IOException ex) {
+ processedTables.add(tableName);
+ } catch (Exception ex) {
logger.error("Error processing " + tableName, ex);
+ } finally {
+ countDownLatch.countDown();
}
+
}
- return processed;
}
public static Path getNewestCoprocessorJar(KylinConfig config, FileSystem fileSystem) throws IOException {