You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/10/20 23:23:03 UTC

[39/50] [abbrv] kylin git commit: KYLIN-2089 Make update HBase coprocessor concurrent

KYLIN-2089 Make update HBase coprocessor concurrent

Signed-off-by: Li Yang <li...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/b7e8065c
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/b7e8065c
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/b7e8065c

Branch: refs/heads/KYLIN-1971
Commit: b7e8065c0b44eb45ec11fd3b498fd72652782b84
Parents: eef157c
Author: kangkaisen <ka...@live.com>
Authored: Wed Oct 12 20:12:15 2016 +0800
Committer: Li Yang <li...@apache.org>
Committed: Thu Oct 20 16:43:51 2016 +0800

----------------------------------------------------------------------
 .../hbase/util/DeployCoprocessorCLI.java        | 55 ++++++++++++++++----
 1 file changed, 46 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/b7e8065c/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
index a1193e7..cc9b988 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
@@ -23,10 +23,14 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.regex.Matcher;
 
 import org.apache.commons.io.IOUtils;
@@ -107,14 +111,15 @@ public class DeployCoprocessorCLI {
         Path hdfsCoprocessorJar = uploadCoprocessorJar(localCoprocessorJar, fileSystem, oldJarPaths);
         logger.info("New coprocessor jar: " + hdfsCoprocessorJar);
 
-        List<String> processedTables = resetCoprocessorOnHTables(hbaseAdmin, hdfsCoprocessorJar, tableNames);
+        resetCoprocessorOnHTables(hbaseAdmin, hdfsCoprocessorJar, tableNames);
 
         // Don't remove old jars, missing coprocessor jar will fail hbase
         // removeOldJars(oldJarPaths, fileSystem);
 
         hbaseAdmin.close();
 
-        logger.info("Processed " + processedTables);
+        logger.info("Processed tables count: " + processedTables.size());
+        logger.info("Processed tables: " + processedTables);
         logger.info("Active coprocessor jar: " + hdfsCoprocessorJar);
     }
 
@@ -220,18 +225,50 @@ public class DeployCoprocessorCLI {
         hbaseAdmin.enableTable(tableName);
     }
 
-    private static List<String> resetCoprocessorOnHTables(HBaseAdmin hbaseAdmin, Path hdfsCoprocessorJar, List<String> tableNames) throws IOException {
-        List<String> processed = new ArrayList<String>();
-
-        for (String tableName : tableNames) {
+    private static List<String> processedTables = Collections.synchronizedList(new ArrayList<String>());
+
+    private static void resetCoprocessorOnHTables(final HBaseAdmin hbaseAdmin, final Path hdfsCoprocessorJar, List<String> tableNames) throws IOException {
+        ExecutorService coprocessorPool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
+        CountDownLatch countDownLatch = new CountDownLatch(tableNames.size());
+
+        for (final String tableName : tableNames) {
+            coprocessorPool.execute(new ResetCoprocessorWorker(countDownLatch, hbaseAdmin, hdfsCoprocessorJar, tableName));
+        }
+
+        try {
+            countDownLatch.await();
+        } catch (InterruptedException e) {
+            logger.error("reset coprocessor failed: ", e);
+        }
+
+        coprocessorPool.shutdown();
+    }
+
+    private static class ResetCoprocessorWorker implements Runnable {
+        private final CountDownLatch countDownLatch;
+        private final HBaseAdmin hbaseAdmin;
+        private final Path hdfsCoprocessorJar;
+        private final String tableName;
+
+        public ResetCoprocessorWorker(CountDownLatch countDownLatch, HBaseAdmin hbaseAdmin, Path hdfsCoprocessorJar, String tableName) {
+            this.countDownLatch = countDownLatch;
+            this.hbaseAdmin = hbaseAdmin;
+            this.hdfsCoprocessorJar = hdfsCoprocessorJar;
+            this.tableName = tableName;
+        }
+
+        @Override
+        public void run() {
             try {
                 resetCoprocessor(tableName, hbaseAdmin, hdfsCoprocessorJar);
-                processed.add(tableName);
-            } catch (IOException ex) {
+                processedTables.add(tableName);
+            } catch (Exception ex) {
                 logger.error("Error processing " + tableName, ex);
+            } finally {
+                countDownLatch.countDown();
             }
+
         }
-        return processed;
     }
 
     public static Path getNewestCoprocessorJar(KylinConfig config, FileSystem fileSystem) throws IOException {