You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2022/10/30 16:02:58 UTC

[hbase] branch master updated: HBASE-27448 Add an admin method to get replication enabled state (#4855)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new dc4fa05a2bb HBASE-27448 Add an admin method to get replication enabled state (#4855)
dc4fa05a2bb is described below

commit dc4fa05a2bb80f309b0204b63ff6b299d5fe0383
Author: LiangJun He <20...@163.com>
AuthorDate: Mon Oct 31 00:02:46 2022 +0800

    HBASE-27448 Add an admin method to get replication enabled state (#4855)
    
    Signed-off-by: Duo Zhang <zh...@apache.org>
---
 .../main/java/org/apache/hadoop/hbase/client/Admin.java  |  8 ++++++++
 .../apache/hadoop/hbase/client/AdminOverAsyncAdmin.java  |  5 +++++
 .../java/org/apache/hadoop/hbase/client/AsyncAdmin.java  |  8 ++++++++
 .../org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java  |  5 +++++
 .../apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java   | 14 ++++++++++++++
 .../src/main/protobuf/server/master/Master.proto         |  3 +++
 .../src/main/protobuf/server/master/Replication.proto    |  7 +++++++
 .../apache/hadoop/hbase/master/MasterRpcServices.java    | 14 ++++++++++++++
 .../hbase/master/replication/ReplicationPeerManager.java |  9 +++++++++
 .../hbase/replication/TestReplicationSmallTests.java     | 16 ++++++++++++++++
 .../hadoop/hbase/rsgroup/VerifyingRSGroupAdmin.java      |  5 +++++
 .../apache/hadoop/hbase/thrift2/client/ThriftAdmin.java  |  5 +++++
 12 files changed, 99 insertions(+)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
index f5da0aa0bde..ff008c15c27 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
@@ -2163,6 +2163,14 @@ public interface Admin extends Abortable, Closeable {
     return peers.get(0).getSyncReplicationState();
   }
 
+  /**
+   * Check if a replication peer is enabled.
+   * @param peerId id of replication peer to check
+   * @return <code>true</code> if replication peer is enabled
+   * @throws IOException if a remote or network exception occurs
+   */
+  boolean isReplicationPeerEnabled(String peerId) throws IOException;
+
   /**
    * Mark region server(s) as decommissioned to prevent additional regions from getting assigned to
    * them. Optionally unload the regions on the servers. If there are multiple servers to be
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdminOverAsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdminOverAsyncAdmin.java
index 9e2b990d91c..a199adc17c2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdminOverAsyncAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdminOverAsyncAdmin.java
@@ -876,6 +876,11 @@ class AdminOverAsyncAdmin implements Admin {
     return admin.transitReplicationPeerSyncReplicationState(peerId, state);
   }
 
+  @Override
+  public boolean isReplicationPeerEnabled(String peerId) throws IOException {
+    return get(admin.isReplicationPeerEnabled(peerId));
+  }
+
   @Override
   public void decommissionRegionServers(List<ServerName> servers, boolean offload)
     throws IOException {
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
index 6070c553f5e..680aa4cc87d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
@@ -795,6 +795,14 @@ public interface AsyncAdmin {
    */
   CompletableFuture<Void> disableTableReplication(TableName tableName);
 
+  /**
+   * Check if a replication peer is enabled.
+   * @param peerId id of replication peer to check
+   * @return true if replication peer is enabled. The return value will be wrapped by a
+   *         {@link CompletableFuture}.
+   */
+  CompletableFuture<Boolean> isReplicationPeerEnabled(String peerId);
+
   /**
    * Take a snapshot for the given table. If the table is enabled, a FLUSH-type snapshot will be
    * taken. If the table is disabled, an offline snapshot is taken. Snapshots are taken sequentially
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
index a8f93dd506d..fba883c1bbe 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
@@ -481,6 +481,11 @@ class AsyncHBaseAdmin implements AsyncAdmin {
     return wrap(rawAdmin.disableTableReplication(tableName));
   }
 
+  @Override
+  public CompletableFuture<Boolean> isReplicationPeerEnabled(String peerId) {
+    return wrap(rawAdmin.isReplicationPeerEnabled(peerId));
+  }
+
   @Override
   public CompletableFuture<Void> snapshot(SnapshotDescription snapshot) {
     return wrap(rawAdmin.snapshot(snapshot));
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
index 4d614907326..21cc1d17b0c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
@@ -325,6 +325,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.Enabl
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
@@ -3734,6 +3736,18 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
     return future;
   }
 
+  @Override
+  public CompletableFuture<Boolean> isReplicationPeerEnabled(String peerId) {
+    GetReplicationPeerStateRequest.Builder request = GetReplicationPeerStateRequest.newBuilder();
+    request.setPeerId(peerId);
+    return this.<Boolean> newMasterCaller()
+      .action((controller, stub) -> this.<GetReplicationPeerStateRequest,
+        GetReplicationPeerStateResponse, Boolean> call(controller, stub, request.build(),
+          (s, c, req, done) -> s.isReplicationPeerEnabled(c, req, done),
+          resp -> resp.getIsEnabled()))
+      .call();
+  }
+
   @Override
   public CompletableFuture<CacheEvictionStats> clearBlockCache(TableName tableName) {
     CompletableFuture<CacheEvictionStats> future = new CompletableFuture<>();
diff --git a/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto b/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto
index 257abe8f11c..f5d4a80f148 100644
--- a/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto
@@ -1106,6 +1106,9 @@ service MasterService {
   rpc TransitReplicationPeerSyncReplicationState(TransitReplicationPeerSyncReplicationStateRequest)
     returns(TransitReplicationPeerSyncReplicationStateResponse);
 
+  rpc IsReplicationPeerEnabled(GetReplicationPeerStateRequest)
+    returns(GetReplicationPeerStateResponse);
+
   /** Returns a list of ServerNames marked as decommissioned. */
   rpc ListDecommissionedRegionServers(ListDecommissionedRegionServersRequest)
     returns(ListDecommissionedRegionServersResponse);
diff --git a/hbase-protocol-shaded/src/main/protobuf/server/master/Replication.proto b/hbase-protocol-shaded/src/main/protobuf/server/master/Replication.proto
index 6619c9694a4..24e459b3978 100644
--- a/hbase-protocol-shaded/src/main/protobuf/server/master/Replication.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/server/master/Replication.proto
@@ -161,3 +161,10 @@ message TransitReplicationPeerSyncReplicationStateRequest {
 message TransitReplicationPeerSyncReplicationStateResponse {
   required uint64 proc_id = 1;
 }
+
+message GetReplicationPeerStateRequest {
+  required string peer_id = 1;
+}
+message GetReplicationPeerStateResponse {
+  required bool is_enabled = 1;
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
index 4a490b1e127..a37c9e35a45 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
@@ -417,6 +417,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.Enabl
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
@@ -2105,6 +2107,18 @@ public class MasterRpcServices extends HBaseRpcServicesBase<HMaster>
     return response.build();
   }
 
+  @Override
+  public GetReplicationPeerStateResponse isReplicationPeerEnabled(RpcController controller,
+    GetReplicationPeerStateRequest request) throws ServiceException {
+    boolean isEnabled;
+    try {
+      isEnabled = server.getReplicationPeerManager().getPeerState(request.getPeerId());
+    } catch (ReplicationException ioe) {
+      throw new ServiceException(ioe);
+    }
+    return GetReplicationPeerStateResponse.newBuilder().setIsEnabled(isEnabled).build();
+  }
+
   @Override
   public ListDecommissionedRegionServersResponse listDecommissionedRegionServers(
     RpcController controller, ListDecommissionedRegionServersRequest request)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
index 0d4e11197cd..06cf559d492 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
@@ -271,6 +271,15 @@ public class ReplicationPeerManager {
       desc.getSyncReplicationState()));
   }
 
+  public boolean getPeerState(String peerId) throws ReplicationException {
+    ReplicationPeerDescription desc = peers.get(peerId);
+    if (desc != null) {
+      return desc.isEnabled();
+    } else {
+      throw new ReplicationException("Replication Peer of " + peerId + " does not exist.");
+    }
+  }
+
   public void enablePeer(String peerId) throws ReplicationException {
     setPeerState(peerId, true);
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
index 92f8e17ed59..3d9fa06d2e7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.replication;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -441,4 +442,19 @@ public class TestReplicationSmallTests extends TestReplicationBase {
       }
     }
   }
+
+  /**
+   * Test for HBASE-27448 Add an admin method to get replication enabled state
+   */
+  @Test
+  public void testGetReplicationPeerState() throws Exception {
+
+    // Test disable replication peer
+    hbaseAdmin.disableReplicationPeer("2");
+    assertFalse(hbaseAdmin.isReplicationPeerEnabled("2"));
+
+    // Test enable replication peer
+    hbaseAdmin.enableReplicationPeer("2");
+    assertTrue(hbaseAdmin.isReplicationPeerEnabled("2"));
+  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdmin.java
index 3c0658455f3..37bef49b491 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdmin.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdmin.java
@@ -676,6 +676,11 @@ public class VerifyingRSGroupAdmin implements Admin, Closeable {
     return admin.transitReplicationPeerSyncReplicationStateAsync(peerId, state);
   }
 
+  @Override
+  public boolean isReplicationPeerEnabled(String peerId) throws IOException {
+    return admin.isReplicationPeerEnabled(peerId);
+  }
+
   public void decommissionRegionServers(List<ServerName> servers, boolean offload)
     throws IOException {
     admin.decommissionRegionServers(servers, offload);
diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java
index 13a1b9920ec..0842497f952 100644
--- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java
+++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java
@@ -1031,6 +1031,11 @@ public class ThriftAdmin implements Admin {
       "transitReplicationPeerSyncReplicationStateAsync not supported in ThriftAdmin");
   }
 
+  @Override
+  public boolean isReplicationPeerEnabled(String peerId) throws IOException {
+    throw new NotImplementedException("isReplicationPeerEnabled not supported in ThriftAdmin");
+  }
+
   @Override
   public void decommissionRegionServers(List<ServerName> servers, boolean offload) {
     throw new NotImplementedException("decommissionRegionServers not supported in ThriftAdmin");