You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2017/04/01 00:21:12 UTC

hbase git commit: HBASE-17668: Implement async assgin/offline/move/unassign methods

Repository: hbase
Updated Branches:
  refs/heads/master 80381f394 -> 5f98ad205


HBASE-17668: Implement async assgin/offline/move/unassign methods

Signed-off-by: zhangduo <zh...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/5f98ad20
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/5f98ad20
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/5f98ad20

Branch: refs/heads/master
Commit: 5f98ad2053ddc31e0abc6863478db594e4447cf8
Parents: 80381f3
Author: huzheng <op...@gmail.com>
Authored: Wed Mar 29 18:37:33 2017 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Sat Apr 1 08:19:14 2017 +0800

----------------------------------------------------------------------
 .../apache/hadoop/hbase/client/AsyncAdmin.java  |  36 +++++
 .../hadoop/hbase/client/AsyncHBaseAdmin.java    | 147 ++++++++++++++++++-
 .../hadoop/hbase/client/TestAsyncAdminBase.java |   2 +-
 .../hbase/client/TestAsyncRegionAdminApi.java   | 146 +++++++++++++++++-
 4 files changed, 326 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/5f98ad20/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
index 9945c40..ef7a4f2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
@@ -431,4 +431,40 @@ public interface AsyncAdmin {
    * @param splitPoint the explicit position to split on
    */
   CompletableFuture<Void> splitRegion(final byte[] regionName, final byte[] splitPoint);
+
+  /**
+   * @param regionName Encoded or full name of region to assign.
+   */
+  CompletableFuture<Void> assign(final byte[] regionName);
+
+  /**
+   * Unassign a region from current hosting regionserver. Region will then be assigned to a
+   * regionserver chosen at random. Region could be reassigned back to the same server. Use
+   * {@link #move(byte[], byte[])} if you want to control the region movement.
+   * @param regionName Encoded or full name of region to unassign. Will clear any existing
+   *          RegionPlan if one found.
+   * @param force If true, force unassign (Will remove region from regions-in-transition too if
+   *          present. If results in double assignment use hbck -fix to resolve. To be used by
+   *          experts).
+   */
+  CompletableFuture<Void> unassign(final byte[] regionName, final boolean force);
+
+  /**
+   * Offline specified region from master's in-memory state. It will not attempt to reassign the
+   * region as in unassign. This API can be used when a region not served by any region server and
+   * still online as per Master's in memory state. If this API is incorrectly used on active region
+   * then master will loose track of that region. This is a special method that should be used by
+   * experts or hbck.
+   * @param regionName Encoded or full name of region to offline
+   */
+  CompletableFuture<Void> offline(final byte[] regionName);
+
+  /**
+   * Move the region <code>r</code> to <code>dest</code>.
+   * @param regionName Encoded or full name of region to move.
+   * @param destServerName The servername of the destination regionserver. If passed the empty byte
+   *          array we'll assign to a random server. A server name is made of host, port and
+   *          startcode. Here is an example: <code> host187.example.com,60020,1289493121758</code>
+   */
+  CompletableFuture<Void> move(final byte[] regionName, final byte[] destServerName);
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/5f98ad20/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
index 54f0766..e42ee57 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
@@ -27,7 +27,6 @@ import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiConsumer;
 import java.util.regex.Pattern;
@@ -68,6 +67,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SplitRegion
 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest;
@@ -105,10 +106,16 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColu
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
 import org.apache.hadoop.hbase.util.Pair;
@@ -260,7 +267,7 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
                 LOG.info("Failed to " + operationType + " table " + table.getTableName(), ex);
                 failed.add(table);
               }
-            })).toArray(size -> new CompletableFuture[size]);
+            })).<CompletableFuture> toArray(size -> new CompletableFuture[size]);
         CompletableFuture.allOf(futures).thenAccept((v) -> {
           future.complete(failed.toArray(new HTableDescriptor[failed.size()]));
         });
@@ -616,7 +623,7 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
     return this.<DeleteNamespaceRequest, DeleteNamespaceResponse> procedureCall(
       RequestConverter.buildDeleteNamespaceRequest(name),
       (s, c, req, done) -> s.deleteNamespace(c, req, done), (resp) -> resp.getProcId(),
-      new ModifyNamespaceProcedureBiConsumer(this, name));
+      new DeleteNamespaceProcedureBiConsumer(this, name));
   }
 
   @Override
@@ -1008,6 +1015,140 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
         .serverName(sn).call();
   }
 
+  /**
+   * Turn regionNameOrEncodedRegionName into regionName, if region does not found, then it'll throw
+   * an IllegalArgumentException wrapped by a {@link CompletableFuture}
+   * @param regionNameOrEncodedRegionName
+   * @return
+   */
+  CompletableFuture<byte[]> getRegionName(byte[] regionNameOrEncodedRegionName) {
+    CompletableFuture<byte[]> future = new CompletableFuture<>();
+    if (Bytes
+        .equals(regionNameOrEncodedRegionName, HRegionInfo.FIRST_META_REGIONINFO.getRegionName())
+        || Bytes.equals(regionNameOrEncodedRegionName,
+          HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes())) {
+      future.complete(HRegionInfo.FIRST_META_REGIONINFO.getRegionName());
+      return future;
+    }
+
+    getRegion(regionNameOrEncodedRegionName).whenComplete((p, err) -> {
+      if (err != null) {
+        future.completeExceptionally(err);
+      }
+      if (p != null && p.getFirst() != null) {
+        future.complete(p.getFirst().getRegionName());
+      } else {
+        future.completeExceptionally(
+          new IllegalArgumentException("Invalid region name or encoded region name: "
+              + Bytes.toStringBinary(regionNameOrEncodedRegionName)));
+      }
+    });
+    return future;
+  }
+
+  @Override
+  public CompletableFuture<Void> assign(byte[] regionName) {
+    CompletableFuture<Void> future = new CompletableFuture<>();
+    getRegionName(regionName).whenComplete((fullRegionName, err) -> {
+      if (err != null) {
+        future.completeExceptionally(err);
+      } else {
+        this.<Void> newMasterCaller()
+            .action(
+              ((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call(
+                controller, stub, RequestConverter.buildAssignRegionRequest(fullRegionName),
+                (s, c, req, done) -> s.assignRegion(c, req, done), resp -> null)))
+            .call().whenComplete((ret, err2) -> {
+              if (err2 != null) {
+                future.completeExceptionally(err2);
+              } else {
+                future.complete(ret);
+              }
+            });
+      }
+    });
+    return future;
+  }
+
+  @Override
+  public CompletableFuture<Void> unassign(byte[] regionName, boolean force) {
+    CompletableFuture<Void> future = new CompletableFuture<>();
+    getRegionName(regionName).whenComplete((fullRegionName, err) -> {
+      if (err != null) {
+        future.completeExceptionally(err);
+      } else {
+        this.<Void> newMasterCaller()
+            .action(((controller, stub) -> this
+                .<UnassignRegionRequest, UnassignRegionResponse, Void> call(controller, stub,
+                  RequestConverter.buildUnassignRegionRequest(fullRegionName, force),
+                  (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null)))
+            .call().whenComplete((ret, err2) -> {
+              if (err2 != null) {
+                future.completeExceptionally(err2);
+              } else {
+                future.complete(ret);
+              }
+            });
+      }
+    });
+    return future;
+  }
+
+  @Override
+  public CompletableFuture<Void> offline(byte[] regionName) {
+    CompletableFuture<Void> future = new CompletableFuture<>();
+    getRegionName(regionName).whenComplete((fullRegionName, err) -> {
+      if (err != null) {
+        future.completeExceptionally(err);
+      } else {
+        this.<Void> newMasterCaller()
+            .action(
+              ((controller, stub) -> this.<OfflineRegionRequest, OfflineRegionResponse, Void> call(
+                controller, stub, RequestConverter.buildOfflineRegionRequest(fullRegionName),
+                (s, c, req, done) -> s.offlineRegion(c, req, done), resp -> null)))
+            .call().whenComplete((ret, err2) -> {
+              if (err2 != null) {
+                future.completeExceptionally(err2);
+              } else {
+                future.complete(ret);
+              }
+            });
+      }
+    });
+    return future;
+  }
+
+  @Override
+  public CompletableFuture<Void> move(byte[] regionName, byte[] destServerName) {
+    CompletableFuture<Void> future = new CompletableFuture<>();
+    getRegionName(regionName).whenComplete((fullRegionName, err) -> {
+      if (err != null) {
+        future.completeExceptionally(err);
+      } else {
+        final MoveRegionRequest request;
+        try {
+          request = RequestConverter.buildMoveRegionRequest(
+            Bytes.toBytes(HRegionInfo.encodeRegionName(fullRegionName)), destServerName);
+        } catch (DeserializationException e) {
+          future.completeExceptionally(e);
+          return;
+        }
+        this.<Void> newMasterCaller()
+            .action((controller, stub) -> this.<MoveRegionRequest, MoveRegionResponse, Void> call(
+              controller, stub, request, (s, c, req, done) -> s.moveRegion(c, req, done),
+              resp -> null))
+            .call().whenComplete((ret, err2) -> {
+              if (err2 != null) {
+                future.completeExceptionally(err2);
+              } else {
+                future.complete(ret);
+              }
+            });
+      }
+    });
+    return future;
+  }
+
   private byte[][] getSplitKeys(byte[] startKey, byte[] endKey, int numRegions) {
     if (numRegions < 3) {
       throw new IllegalArgumentException("Must create at least three regions");

http://git-wip-us.apache.org/repos/asf/hbase/blob/5f98ad20/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminBase.java
index 583ec64..f0dee0a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminBase.java
@@ -50,7 +50,7 @@ public abstract class TestAsyncAdminBase {
     TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 1000);
     TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 3000);
     TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0);
-    TEST_UTIL.startMiniCluster(1);
+    TEST_UTIL.startMiniCluster(2);
     ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/5f98ad20/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java
index 980e07a..038d6d4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -30,13 +31,20 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.master.AssignmentManager;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.master.RegionState;
+import org.apache.hadoop.hbase.master.RegionStates;
+import org.apache.hadoop.hbase.master.ServerManager;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
 import org.apache.hadoop.hbase.util.Pair;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -209,7 +217,7 @@ public class TestAsyncRegionAdminApi extends TestAsyncAdminBase {
     HColumnDescriptor cd = new HColumnDescriptor("d");
     HTableDescriptor td = new HTableDescriptor(tableName);
     td.addFamily(cd);
-    byte[][] splitRows = new byte[][] { "3".getBytes(), "6".getBytes() };
+    byte[][] splitRows = new byte[][] { Bytes.toBytes("3"), Bytes.toBytes("6") };
     Admin syncAdmin = TEST_UTIL.getAdmin();
     try {
       TEST_UTIL.createTable(td, splitRows);
@@ -296,4 +304,140 @@ public class TestAsyncRegionAdminApi extends TestAsyncAdminBase {
     assertEquals(count, 2);
   }
 
+  @Test
+  public void testAssignRegionAndUnassignRegion() throws Exception {
+    final TableName tableName = TableName.valueOf("testAssignRegionAndUnassignRegion");
+    try {
+      // create test table
+      HTableDescriptor desc = new HTableDescriptor(tableName);
+      desc.addFamily(new HColumnDescriptor(FAMILY));
+      admin.createTable(desc).get();
+
+      // add region to meta.
+      Table meta = TEST_UTIL.getConnection().getTable(TableName.META_TABLE_NAME);
+      HRegionInfo hri =
+          new HRegionInfo(desc.getTableName(), Bytes.toBytes("A"), Bytes.toBytes("Z"));
+      MetaTableAccessor.addRegionToMeta(meta, hri);
+
+      // assign region.
+      HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
+      AssignmentManager am = master.getAssignmentManager();
+      admin.assign(hri.getRegionName()).get();
+      am.waitForAssignment(hri);
+
+      // assert region on server
+      RegionStates regionStates = am.getRegionStates();
+      ServerName serverName = regionStates.getRegionServerOfRegion(hri);
+      TEST_UTIL.assertRegionOnServer(hri, serverName, 200);
+      assertTrue(regionStates.getRegionState(hri).isOpened());
+
+      // Region is assigned now. Let's assign it again.
+      // Master should not abort, and region should be assigned.
+      admin.assign(hri.getRegionName()).get();
+      am.waitForAssignment(hri);
+      assertTrue(regionStates.getRegionState(hri).isOpened());
+
+      // unassign region
+      admin.unassign(hri.getRegionName(), true).get();
+      am.waitForAssignment(hri);
+      assertTrue(regionStates.getRegionState(hri).isOpened());
+    } finally {
+      TEST_UTIL.deleteTable(tableName);
+    }
+  }
+
+  HRegionInfo createTableAndGetOneRegion(final TableName tableName)
+      throws IOException, InterruptedException {
+    HTableDescriptor desc = new HTableDescriptor(tableName);
+    desc.addFamily(new HColumnDescriptor(FAMILY));
+    admin.createTable(desc, Bytes.toBytes("A"), Bytes.toBytes("Z"), 5);
+
+    // wait till the table is assigned
+    HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
+    long timeoutTime = System.currentTimeMillis() + 3000;
+    while (true) {
+      List<HRegionInfo> regions =
+          master.getAssignmentManager().getRegionStates().getRegionsOfTable(tableName);
+      if (regions.size() > 3) {
+        return regions.get(2);
+      }
+      long now = System.currentTimeMillis();
+      if (now > timeoutTime) {
+        fail("Could not find an online region");
+      }
+      Thread.sleep(10);
+    }
+  }
+
+  @Test
+  public void testOfflineRegion() throws Exception {
+    final TableName tableName = TableName.valueOf("testOfflineRegion");
+    try {
+      HRegionInfo hri = createTableAndGetOneRegion(tableName);
+
+      RegionStates regionStates =
+          TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
+      ServerName serverName = regionStates.getRegionServerOfRegion(hri);
+      TEST_UTIL.assertRegionOnServer(hri, serverName, 200);
+      admin.offline(hri.getRegionName()).get();
+
+      long timeoutTime = System.currentTimeMillis() + 3000;
+      while (true) {
+        if (regionStates.getRegionByStateOfTable(tableName).get(RegionState.State.OFFLINE)
+            .contains(hri))
+          break;
+        long now = System.currentTimeMillis();
+        if (now > timeoutTime) {
+          fail("Failed to offline the region in time");
+          break;
+        }
+        Thread.sleep(10);
+      }
+      RegionState regionState = regionStates.getRegionState(hri);
+      assertTrue(regionState.isOffline());
+    } finally {
+      TEST_UTIL.deleteTable(tableName);
+    }
+  }
+
+  @Test
+  public void testMoveRegion() throws Exception {
+    final TableName tableName = TableName.valueOf("testMoveRegion");
+    try {
+      HRegionInfo hri = createTableAndGetOneRegion(tableName);
+
+      HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
+      RegionStates regionStates = master.getAssignmentManager().getRegionStates();
+      ServerName serverName = regionStates.getRegionServerOfRegion(hri);
+      ServerManager serverManager = master.getServerManager();
+      ServerName destServerName = null;
+      List<JVMClusterUtil.RegionServerThread> regionServers =
+          TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads();
+      for (JVMClusterUtil.RegionServerThread regionServer : regionServers) {
+        HRegionServer destServer = regionServer.getRegionServer();
+        destServerName = destServer.getServerName();
+        if (!destServerName.equals(serverName) && serverManager.isServerOnline(destServerName)) {
+          break;
+        }
+      }
+      assertTrue(destServerName != null && !destServerName.equals(serverName));
+      admin.move(hri.getEncodedNameAsBytes(), Bytes.toBytes(destServerName.getServerName())).get();
+
+      long timeoutTime = System.currentTimeMillis() + 30000;
+      while (true) {
+        ServerName sn = regionStates.getRegionServerOfRegion(hri);
+        if (sn != null && sn.equals(destServerName)) {
+          TEST_UTIL.assertRegionOnServer(hri, sn, 200);
+          break;
+        }
+        long now = System.currentTimeMillis();
+        if (now > timeoutTime) {
+          fail("Failed to move the region in time: " + regionStates.getRegionState(hri));
+        }
+        regionStates.waitForUpdate(50);
+      }
+    } finally {
+      TEST_UTIL.deleteTable(tableName);
+    }
+  }
 }