You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by op...@apache.org on 2019/08/15 03:13:31 UTC

[hbase] branch branch-2.2 updated: HBASE-22810 Initialize an separate ThreadPoolExecutor for taking/restoring snapshot (#486)

This is an automated email from the ASF dual-hosted git repository.

openinx pushed a commit to branch branch-2.2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.2 by this push:
     new b4734e7  HBASE-22810 Initialize an separate ThreadPoolExecutor for taking/restoring snapshot (#486)
b4734e7 is described below

commit b4734e766785b78dd2d011af00c40dc35e54a5e6
Author: openinx <op...@gmail.com>
AuthorDate: Thu Aug 15 10:57:42 2019 +0800

    HBASE-22810 Initialize an separate ThreadPoolExecutor for taking/restoring snapshot (#486)
---
 .../java/org/apache/hadoop/hbase/HConstants.java   | 27 ++++++++++++++++++
 .../apache/hadoop/hbase/executor/EventType.java    |  9 ++----
 .../apache/hadoop/hbase/executor/ExecutorType.java |  1 +
 .../org/apache/hadoop/hbase/master/HMaster.java    | 29 +++++++++++--------
 .../hadoop/hbase/executor/TestExecutorService.java | 33 ++++++++++++++++++++++
 5 files changed, 80 insertions(+), 19 deletions(-)

diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index e231af5..2ce838f 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -1456,6 +1456,33 @@ public final class HConstants {
       "hbase.util.default.lossycounting.errorrate";
   public static final String NOT_IMPLEMENTED = "Not implemented";
 
+  /**
+   * Configurations for master executor services.
+   */
+  public static final String MASTER_OPEN_REGION_THREADS =
+      "hbase.master.executor.openregion.threads";
+  public static final int MASTER_OPEN_REGION_THREADS_DEFAULT = 5;
+
+  public static final String MASTER_CLOSE_REGION_THREADS =
+      "hbase.master.executor.closeregion.threads";
+  public static final int MASTER_CLOSE_REGION_THREADS_DEFAULT = 5;
+
+  public static final String MASTER_SERVER_OPERATIONS_THREADS =
+      "hbase.master.executor.serverops.threads";
+  public static final int MASTER_SERVER_OPERATIONS_THREADS_DEFAULT = 5;
+
+  public static final String MASTER_META_SERVER_OPERATIONS_THREADS =
+      "hbase.master.executor.meta.serverops.threads";
+  public static final int MASTER_META_SERVER_OPERATIONS_THREADS_DEFAULT = 5;
+
+  public static final String MASTER_LOG_REPLAY_OPS_THREADS =
+      "hbase.master.executor.logreplayops.threads";
+  public static final int MASTER_LOG_REPLAY_OPS_THREADS_DEFAULT = 10;
+
+  public static final String MASTER_SNAPSHOT_OPERATIONS_THREADS =
+      "hbase.master.executor.snapshot.threads";
+  public static final int MASTER_SNAPSHOT_OPERATIONS_THREADS_DEFAULT = 3;
+
   private HConstants() {
     // Can't be instantiated with this ctor.
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java
index 1ae9db2..80c2717 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java
@@ -206,13 +206,13 @@ public enum EventType {
    * C_M_SNAPSHOT_TABLE<br>
    * Client asking Master to snapshot an offline table.
    */
-  C_M_SNAPSHOT_TABLE        (48, ExecutorType.MASTER_TABLE_OPERATIONS),
+  C_M_SNAPSHOT_TABLE        (48, ExecutorType.MASTER_SNAPSHOT_OPERATIONS),
   /**
    * Messages originating from Client to Master.<br>
    * C_M_RESTORE_SNAPSHOT<br>
    * Client asking Master to restore a snapshot.
    */
-  C_M_RESTORE_SNAPSHOT      (49, ExecutorType.MASTER_TABLE_OPERATIONS),
+  C_M_RESTORE_SNAPSHOT      (49, ExecutorType.MASTER_SNAPSHOT_OPERATIONS),
 
   // Updates from master to ZK. This is done by the master and there is
   // nothing to process by either Master or RS
@@ -314,11 +314,6 @@ public enum EventType {
     throw new IllegalArgumentException("Unknown code " + code);
   }
 
-  public boolean isOnlineSchemaChangeSupported() {
-    return this.equals(EventType.C_M_ADD_FAMILY) || this.equals(EventType.C_M_DELETE_FAMILY) ||
-      this.equals(EventType.C_M_MODIFY_FAMILY) || this.equals(EventType.C_M_MODIFY_TABLE);
-  }
-
   ExecutorType getExecutorServiceType() {
     return this.executor;
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java
index 596385d..66baccb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java
@@ -34,6 +34,7 @@ public enum ExecutorType {
   MASTER_RS_SHUTDOWN         (5),
   MASTER_META_SERVER_OPERATIONS (6),
   M_LOG_REPLAY_OPS           (7),
+  MASTER_SNAPSHOT_OPERATIONS (8),
 
   // RegionServer executor services
   RS_OPEN_REGION             (20),
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index ebb8801..9b5b900 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -1392,18 +1392,23 @@ public class HMaster extends HRegionServer implements MasterServices {
    *  as OOMEs; it should be lightly loaded. See what HRegionServer does if
    *  need to install an unexpected exception handler.
    */
-  private void startServiceThreads() throws IOException{
-   // Start the executor service pools
-   this.executorService.startExecutorService(ExecutorType.MASTER_OPEN_REGION,
-      conf.getInt("hbase.master.executor.openregion.threads", 5));
-   this.executorService.startExecutorService(ExecutorType.MASTER_CLOSE_REGION,
-      conf.getInt("hbase.master.executor.closeregion.threads", 5));
-   this.executorService.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS,
-      conf.getInt("hbase.master.executor.serverops.threads", 5));
-   this.executorService.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS,
-      conf.getInt("hbase.master.executor.meta.serverops.threads", 5));
-   this.executorService.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS,
-      conf.getInt("hbase.master.executor.logreplayops.threads", 10));
+  private void startServiceThreads() throws IOException {
+    // Start the executor service pools
+    this.executorService.startExecutorService(ExecutorType.MASTER_OPEN_REGION, conf.getInt(
+      HConstants.MASTER_OPEN_REGION_THREADS, HConstants.MASTER_OPEN_REGION_THREADS_DEFAULT));
+    this.executorService.startExecutorService(ExecutorType.MASTER_CLOSE_REGION, conf.getInt(
+      HConstants.MASTER_CLOSE_REGION_THREADS, HConstants.MASTER_CLOSE_REGION_THREADS_DEFAULT));
+    this.executorService.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS,
+      conf.getInt(HConstants.MASTER_SERVER_OPERATIONS_THREADS,
+        HConstants.MASTER_SERVER_OPERATIONS_THREADS_DEFAULT));
+    this.executorService.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS,
+      conf.getInt(HConstants.MASTER_META_SERVER_OPERATIONS_THREADS,
+        HConstants.MASTER_META_SERVER_OPERATIONS_THREADS_DEFAULT));
+    this.executorService.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS, conf.getInt(
+      HConstants.MASTER_LOG_REPLAY_OPS_THREADS, HConstants.MASTER_LOG_REPLAY_OPS_THREADS_DEFAULT));
+    this.executorService.startExecutorService(ExecutorType.MASTER_SNAPSHOT_OPERATIONS,
+      conf.getInt(HConstants.MASTER_SNAPSHOT_OPERATIONS_THREADS,
+        HConstants.MASTER_SNAPSHOT_OPERATIONS_THREADS_DEFAULT));
 
    // We depend on there being only one instance of this executor running
    // at a time.  To do concurrency, would need fencing of enable/disable of
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/executor/TestExecutorService.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/executor/TestExecutorService.java
index f6e9409..205c6c6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/executor/TestExecutorService.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/executor/TestExecutorService.java
@@ -28,6 +28,7 @@ import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.io.StringWriter;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -41,6 +42,7 @@ import org.apache.hadoop.hbase.executor.ExecutorService.Executor;
 import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorStatus;
 import org.apache.hadoop.hbase.testclassification.MiscTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Assert;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -219,5 +221,36 @@ public class TestExecutorService {
     executorService.shutdown();
   }
 
+  @Test
+  public void testSnapshotHandlers() throws Exception {
+    final Configuration conf = HBaseConfiguration.create();
+    final Server server = mock(Server.class);
+    when(server.getConfiguration()).thenReturn(conf);
+
+    ExecutorService executorService = new ExecutorService("testSnapshotHandlers");
+    executorService.startExecutorService(ExecutorType.MASTER_SNAPSHOT_OPERATIONS, 1);
+
+    CountDownLatch latch = new CountDownLatch(1);
+    executorService.submit(new EventHandler(server, EventType.C_M_SNAPSHOT_TABLE) {
+      @Override
+      public void process() throws IOException {
+        try {
+          latch.await();
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+      }
+    });
+
+    int activeCount = executorService.getExecutor(ExecutorType.MASTER_SNAPSHOT_OPERATIONS)
+        .getThreadPoolExecutor().getActiveCount();
+    Assert.assertEquals(activeCount, 1);
+    latch.countDown();
+    Waiter.waitFor(conf, 3000, () -> {
+      int count = executorService.getExecutor(ExecutorType.MASTER_SNAPSHOT_OPERATIONS)
+          .getThreadPoolExecutor().getActiveCount();
+      return count == 0;
+    });
+  }
 }