You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by su...@apache.org on 2021/02/23 01:57:45 UTC
[hbase] 05/10: HBASE-24982 Disassemble the method replicateWALEntry
from AdminService to a new interface ReplicationServerService (#2360)
This is an automated email from the ASF dual-hosted git repository.
sunxin pushed a commit to branch HBASE-24666
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 1f11ee4ad71b032555b2e46f56d744ccf621fa61
Author: XinSun <dd...@gmail.com>
AuthorDate: Wed Sep 9 15:00:37 2020 +0800
HBASE-24982 Disassemble the method replicateWALEntry from AdminService to a new interface ReplicationServerService (#2360)
Signed-off-by: Wellington Chevreuil <wc...@apache.org>
---
.../hadoop/hbase/client/AsyncConnectionImpl.java | 16 ++
.../server/replication/ReplicationServer.proto | 32 ++++
.../hadoop/hbase/replication/ReplicationUtils.java | 19 ++
.../hbase/client/AsyncClusterConnection.java | 5 +
.../hbase/client/AsyncClusterConnectionImpl.java | 5 +
.../hbase/client/AsyncReplicationServerAdmin.java | 80 +++++++++
.../hbase/protobuf/ReplicationProtobufUtil.java | 18 ++
.../hadoop/hbase/regionserver/RSRpcServices.java | 8 +-
.../replication/HBaseReplicationEndpoint.java | 57 +++++-
.../replication/ReplicationServerRpcServices.java | 200 +--------------------
.../HBaseInterClusterReplicationEndpoint.java | 7 +-
.../hbase/client/DummyAsyncClusterConnection.java | 5 +
.../replication/TestHBaseReplicationEndpoint.java | 17 +-
.../hbase/replication/TestReplicationServer.java | 43 ++++-
14 files changed, 288 insertions(+), 224 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index 8a1ac5a..5a332d8 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -65,6 +65,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminServic
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerProtos.ReplicationServerService;
/**
* The implementation of AsyncConnection.
@@ -101,6 +102,8 @@ class AsyncConnectionImpl implements AsyncConnection {
private final ConcurrentMap<String, ClientService.Interface> rsStubs = new ConcurrentHashMap<>();
private final ConcurrentMap<String, AdminService.Interface> adminSubs = new ConcurrentHashMap<>();
+ private final ConcurrentMap<String, ReplicationServerService.Interface> replStubs =
+ new ConcurrentHashMap<>();
private final AtomicReference<MasterService.Interface> masterStub = new AtomicReference<>();
@@ -278,12 +281,25 @@ class AsyncConnectionImpl implements AsyncConnection {
return AdminService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
}
+ private ReplicationServerService.Interface createReplicationServerStub(ServerName serverName)
+ throws IOException {
+ return ReplicationServerService.newStub(
+ rpcClient.createRpcChannel(serverName, user, rpcTimeout));
+ }
+
AdminService.Interface getAdminStub(ServerName serverName) throws IOException {
return ConcurrentMapUtils.computeIfAbsentEx(adminSubs,
getStubKey(AdminService.getDescriptor().getName(), serverName),
() -> createAdminServerStub(serverName));
}
+ ReplicationServerService.Interface getReplicationServerStub(ServerName serverName)
+ throws IOException {
+ return ConcurrentMapUtils.computeIfAbsentEx(replStubs,
+ getStubKey(ReplicationServerService.Interface.class.getSimpleName(), serverName,
+ hostnameCanChange), () -> createReplicationServerStub(serverName));
+ }
+
CompletableFuture<MasterService.Interface> getMasterStub() {
return ConnectionUtils.getOrFetch(masterStub, masterStubMakeFuture, false, () -> {
CompletableFuture<MasterService.Interface> future = new CompletableFuture<>();
diff --git a/hbase-protocol-shaded/src/main/protobuf/server/replication/ReplicationServer.proto b/hbase-protocol-shaded/src/main/protobuf/server/replication/ReplicationServer.proto
new file mode 100644
index 0000000..ed334c4
--- /dev/null
+++ b/hbase-protocol-shaded/src/main/protobuf/server/replication/ReplicationServer.proto
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+syntax = "proto2";
+package hbase.pb;
+
+option java_package = "org.apache.hadoop.hbase.shaded.protobuf.generated";
+option java_outer_classname = "ReplicationServerProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+option optimize_for = SPEED;
+
+import "server/region/Admin.proto";
+
+service ReplicationServerService {
+ rpc ReplicateWALEntry(ReplicateWALEntryRequest)
+ returns(ReplicateWALEntryResponse);
+}
\ No newline at end of file
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
index a786206..7bafbc2 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
@@ -30,6 +30,9 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.AsyncAdmin;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -212,4 +215,20 @@ public final class ReplicationUtils {
}
return initialValue * HConstants.RETRY_BACKOFF[ntries];
}
+
+ /**
+ * Check whether peer cluster supports replication offload.
+ * @param peerConn connection for peer cluster
+ * @return true if peer cluster version >= 3
+ * @throws IOException exception
+ */
+ public static boolean isPeerClusterSupportReplicationOffload(AsyncConnection peerConn)
+ throws IOException {
+ AsyncAdmin admin = peerConn.getAdmin();
+ String version = FutureUtils.get(admin.getClusterMetrics()).getHBaseVersion();
+ if (Integer.parseInt(version.split("\\.")[0]) >= 3) {
+ return true;
+ }
+ return false;
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
index 92118ac..b6a3b97 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
@@ -42,6 +42,11 @@ public interface AsyncClusterConnection extends AsyncConnection {
AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName);
/**
+ * Get the admin service for the give replication server.
+ */
+ AsyncReplicationServerAdmin getReplicationServerAdmin(ServerName serverName);
+
+ /**
* Get the nonce generator for this connection.
*/
NonceGenerator getNonceGenerator();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
index 39fc3a2..e4c2ee3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
@@ -71,6 +71,11 @@ class AsyncClusterConnectionImpl extends AsyncConnectionImpl implements AsyncClu
}
@Override
+ public AsyncReplicationServerAdmin getReplicationServerAdmin(ServerName serverName) {
+ return new AsyncReplicationServerAdmin(serverName, this);
+ }
+
+ @Override
public CompletableFuture<FlushRegionResponse> flush(byte[] regionName,
boolean writeFlushWALMarker) {
RawAsyncHBaseAdmin admin = (RawAsyncHBaseAdmin) getAdmin();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncReplicationServerAdmin.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncReplicationServerAdmin.java
new file mode 100644
index 0000000..7511a64
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncReplicationServerAdmin.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerProtos.ReplicationServerService;
+
+/**
+ * A simple wrapper of the {@link ReplicationServerService} for a replication server.
+ * <p/>
+ * Notice that there is no retry, and this is intentional.
+ */
+@InterfaceAudience.Private
+public class AsyncReplicationServerAdmin {
+
+ private final ServerName server;
+
+ private final AsyncConnectionImpl conn;
+
+ AsyncReplicationServerAdmin(ServerName server, AsyncConnectionImpl conn) {
+ this.server = server;
+ this.conn = conn;
+ }
+
+ @FunctionalInterface
+ private interface RpcCall<RESP> {
+ void call(ReplicationServerService.Interface stub, HBaseRpcController controller,
+ RpcCallback<RESP> done);
+ }
+
+ private <RESP> CompletableFuture<RESP> call(RpcCall<RESP> rpcCall, CellScanner cellScanner) {
+ CompletableFuture<RESP> future = new CompletableFuture<>();
+ HBaseRpcController controller = conn.rpcControllerFactory.newController(cellScanner);
+ try {
+ rpcCall.call(conn.getReplicationServerStub(server), controller, resp -> {
+ if (controller.failed()) {
+ future.completeExceptionally(controller.getFailed());
+ } else {
+ future.complete(resp);
+ }
+ });
+ } catch (IOException e) {
+ future.completeExceptionally(e);
+ }
+ return future;
+ }
+
+ public CompletableFuture<AdminProtos.ReplicateWALEntryResponse> replicateWALEntry(
+ AdminProtos.ReplicateWALEntryRequest request, CellScanner cellScanner, int timeout) {
+ return call((stub, controller, done) -> {
+ controller.setCallTimeout(timeout);
+ stub.replicateWALEntry(controller, request, done);
+ }, cellScanner);
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtobufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtobufUtil.java
index e47c929..17f48a6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtobufUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtobufUtil.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
+import org.apache.hadoop.hbase.client.AsyncReplicationServerAdmin;
import org.apache.hadoop.hbase.io.SizedCellScanner;
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
import org.apache.hadoop.hbase.util.FutureUtils;
@@ -62,6 +63,23 @@ public class ReplicationProtobufUtil {
}
/**
+ * A helper to replicate a list of WAL entries using replication server admin
+ * @param admin the replication server admin
+ * @param entries Array of WAL entries to be replicated
+ * @param replicationClusterId Id which will uniquely identify source cluster FS client
+ * configurations in the replication configuration directory
+ * @param sourceBaseNamespaceDir Path to source cluster base namespace directory
+ * @param sourceHFileArchiveDir Path to the source cluster hfile archive directory
+ */
+ public static void replicateWALEntry(AsyncReplicationServerAdmin admin, Entry[] entries,
+ String replicationClusterId, Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir,
+ int timeout) throws IOException {
+ Pair<ReplicateWALEntryRequest, CellScanner> p = buildReplicateWALEntryRequest(entries, null,
+ replicationClusterId, sourceBaseNamespaceDir, sourceHFileArchiveDir);
+ FutureUtils.get(admin.replicateWALEntry(p.getFirst(), p.getSecond(), timeout));
+ }
+
+ /**
* Create a new ReplicateWALEntryRequest from a list of WAL entries
* @param entries the WAL entries to be replicated
* @return a pair of ReplicateWALEntryRequest and a CellScanner over all the WALEdit values found.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 54395c3..a4a735b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -257,6 +257,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuo
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerProtos.ReplicationServerService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
@@ -1566,8 +1567,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
if (admin) {
bssi.add(new BlockingServiceAndInterface(
- AdminService.newReflectiveBlockingService(this),
- AdminService.BlockingInterface.class));
+ AdminService.newReflectiveBlockingService(this),
+ AdminService.BlockingInterface.class));
+ bssi.add(new BlockingServiceAndInterface(
+ ReplicationServerService.newReflectiveBlockingService(this),
+ ReplicationServerService.BlockingInterface.class));
}
return new org.apache.hbase.thirdparty.com.google.common.collect.
ImmutableList.Builder<BlockingServiceAndInterface>().addAll(bssi).build();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
index 8678685..f38fd08 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
@@ -26,11 +26,15 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
+import org.apache.hadoop.hbase.client.AsyncReplicationServerAdmin;
import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
+import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil;
import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.zookeeper.ZKListener;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.ServerName;
@@ -278,7 +282,7 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
}
ServerName serverName =
sinkServers.get(ThreadLocalRandom.current().nextInt(sinkServers.size()));
- return new SinkPeer(serverName, conn.getRegionServerAdmin(serverName));
+ return createSinkPeer(serverName);
}
/**
@@ -340,21 +344,60 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
/**
* Wraps a replication region server sink to provide the ability to identify it.
*/
- public static class SinkPeer {
+ public static abstract class SinkPeer {
private ServerName serverName;
- private AsyncRegionServerAdmin regionServer;
- public SinkPeer(ServerName serverName, AsyncRegionServerAdmin regionServer) {
+ public SinkPeer(ServerName serverName) {
this.serverName = serverName;
- this.regionServer = regionServer;
}
ServerName getServerName() {
return serverName;
}
- public AsyncRegionServerAdmin getRegionServer() {
- return regionServer;
+ public abstract void replicateWALEntry(WAL.Entry[] entries, String replicationClusterId,
+ Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir, int timeout) throws IOException;
+ }
+
+ public static class RegionServerSinkPeer extends SinkPeer {
+
+ private AsyncRegionServerAdmin regionServer;
+
+ public RegionServerSinkPeer(ServerName serverName,
+ AsyncRegionServerAdmin replicationServer) {
+ super(serverName);
+ this.regionServer = replicationServer;
+ }
+
+ public void replicateWALEntry(WAL.Entry[] entries, String replicationClusterId,
+ Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir, int timeout) throws IOException {
+ ReplicationProtobufUtil.replicateWALEntry(regionServer, entries, replicationClusterId,
+ sourceBaseNamespaceDir, sourceHFileArchiveDir, timeout);
+ }
+ }
+
+ public static class ReplicationServerSinkPeer extends SinkPeer {
+
+ private AsyncReplicationServerAdmin replicationServer;
+
+ public ReplicationServerSinkPeer(ServerName serverName,
+ AsyncReplicationServerAdmin replicationServer) {
+ super(serverName);
+ this.replicationServer = replicationServer;
+ }
+
+ public void replicateWALEntry(WAL.Entry[] entries, String replicationClusterId,
+ Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir, int timeout) throws IOException {
+ ReplicationProtobufUtil.replicateWALEntry(replicationServer, entries, replicationClusterId,
+ sourceBaseNamespaceDir, sourceHFileArchiveDir, timeout);
+ }
+ }
+
+ private SinkPeer createSinkPeer(ServerName serverName) throws IOException {
+ if (ReplicationUtils.isPeerClusterSupportReplicationOffload(conn)) {
+ return new ReplicationServerSinkPeer(serverName, conn.getReplicationServerAdmin(serverName));
+ } else {
+ return new RegionServerSinkPeer(serverName, conn.getRegionServerAdmin(serverName));
}
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationServerRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationServerRpcServices.java
index 1b9b699..15d4f8c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationServerRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationServerRpcServices.java
@@ -27,14 +27,12 @@ import java.util.concurrent.atomic.LongAdder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.PriorityFunction;
-import org.apache.hadoop.hbase.ipc.QosPriority;
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
import org.apache.hadoop.hbase.ipc.RpcServerFactory;
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
@@ -58,53 +56,11 @@ import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponseRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponses;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponseRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponses;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerProtos.ReplicationServerService;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
@@ -117,7 +73,7 @@ import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
@InterfaceAudience.Private
@SuppressWarnings("deprecation")
public class ReplicationServerRpcServices implements HBaseRPCErrorHandler,
- AdminService.BlockingInterface, PriorityFunction {
+ ReplicationServerService.BlockingInterface, PriorityFunction {
protected static final Logger LOG = LoggerFactory.getLogger(ReplicationServerRpcServices.class);
@@ -256,8 +212,8 @@ public class ReplicationServerRpcServices implements HBaseRPCErrorHandler,
protected List<BlockingServiceAndInterface> getServices() {
List<BlockingServiceAndInterface> bssi = new ArrayList<>();
bssi.add(new BlockingServiceAndInterface(
- AdminService.newReflectiveBlockingService(this),
- AdminService.BlockingInterface.class));
+ ReplicationServerService.newReflectiveBlockingService(this),
+ ReplicationServerService.BlockingInterface.class));
return new ImmutableList.Builder<BlockingServiceAndInterface>().addAll(bssi).build();
}
@@ -325,154 +281,6 @@ public class ReplicationServerRpcServices implements HBaseRPCErrorHandler,
}
}
- @Override
- public GetRegionInfoResponse getRegionInfo(RpcController controller, GetRegionInfoRequest request)
- throws ServiceException {
- throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
- }
-
- @Override
- public GetStoreFileResponse getStoreFile(RpcController controller, GetStoreFileRequest request)
- throws ServiceException {
- throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
- }
-
- @Override
- public GetOnlineRegionResponse getOnlineRegion(RpcController controller,
- GetOnlineRegionRequest request) throws ServiceException {
- throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
- }
-
- @Override
- public OpenRegionResponse openRegion(RpcController controller, OpenRegionRequest request)
- throws ServiceException {
- throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
- }
-
- @Override
- public WarmupRegionResponse warmupRegion(RpcController controller, WarmupRegionRequest request)
- throws ServiceException {
- throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
- }
-
- @Override
- public CloseRegionResponse closeRegion(RpcController controller, CloseRegionRequest request)
- throws ServiceException {
- throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
- }
-
- @Override
- public FlushRegionResponse flushRegion(RpcController controller, FlushRegionRequest request)
- throws ServiceException {
- throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
- }
-
- @Override
- public CompactionSwitchResponse compactionSwitch(RpcController controller,
- CompactionSwitchRequest request) throws ServiceException {
- throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
- }
-
- @Override
- public CompactRegionResponse compactRegion(RpcController controller,
- CompactRegionRequest request) throws ServiceException {
- throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
- }
-
- @Override
- public ReplicateWALEntryResponse replay(RpcController controller,
- ReplicateWALEntryRequest request) throws ServiceException {
- throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
- }
-
- @Override
- public RollWALWriterResponse rollWALWriter(RpcController controller, RollWALWriterRequest request)
- throws ServiceException {
- throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
- }
-
- @Override
- public GetServerInfoResponse getServerInfo(RpcController controller, GetServerInfoRequest request)
- throws ServiceException {
- throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
- }
-
- /**
- * Stop the replication server.
- *
- * @param controller the RPC controller
- * @param request the request
- */
- @Override
- @QosPriority(priority=HConstants.ADMIN_QOS)
- public StopServerResponse stopServer(final RpcController controller,
- final StopServerRequest request) {
- requestCount.increment();
- String reason = request.getReason();
- replicationServer.stop(reason);
- return StopServerResponse.newBuilder().build();
- }
-
- @Override
- public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller,
- UpdateFavoredNodesRequest request) throws ServiceException {
- throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
- }
-
- @Override
- public UpdateConfigurationResponse updateConfiguration(RpcController controller,
- UpdateConfigurationRequest request) throws ServiceException {
- throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
- }
-
- @Override
- public GetRegionLoadResponse getRegionLoad(RpcController controller,
- GetRegionLoadRequest request) throws ServiceException {
- throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
- }
-
- @Override
- public ClearCompactionQueuesResponse clearCompactionQueues(RpcController controller,
- ClearCompactionQueuesRequest request) throws ServiceException {
- throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
- }
-
- @Override
- public ClearRegionBlockCacheResponse clearRegionBlockCache(RpcController controller,
- ClearRegionBlockCacheRequest request) throws ServiceException {
- throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
- }
-
- @Override
- public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(RpcController controller,
- GetSpaceQuotaSnapshotsRequest request) throws ServiceException {
- throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
- }
-
- @Override
- public ExecuteProceduresResponse executeProcedures(RpcController controller,
- ExecuteProceduresRequest request) throws ServiceException {
- throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
- }
-
- @Override
- public SlowLogResponses getSlowLogResponses(RpcController controller,
- SlowLogResponseRequest request) throws ServiceException {
- throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
- }
-
- @Override
- public SlowLogResponses getLargeLogResponses(RpcController controller,
- SlowLogResponseRequest request) throws ServiceException {
- throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
- }
-
- @Override
- public ClearSlowLogResponses clearSlowLogsResponses(RpcController controller,
- ClearSlowLogResponseRequest request) throws ServiceException {
- throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
- }
-
protected AccessChecker getAccessChecker() {
return accessChecker;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
index c77f74f..d8517b0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -51,7 +51,6 @@ import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.ipc.RpcServer;
-import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil;
import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
@@ -561,11 +560,9 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
logPeerId(), entriesHashCode, entries.size(), size, replicationClusterId);
}
sinkPeer = getReplicationSink();
- AsyncRegionServerAdmin rsAdmin = sinkPeer.getRegionServer();
try {
- ReplicationProtobufUtil.replicateWALEntry(rsAdmin,
- entries.toArray(new Entry[entries.size()]), replicationClusterId, baseNamespaceDir,
- hfileArchiveDir, timeout);
+ sinkPeer.replicateWALEntry(entries.toArray(new Entry[entries.size()]), replicationClusterId,
+ baseNamespaceDir, hfileArchiveDir, timeout);
if (LOG.isTraceEnabled()) {
LOG.trace("{} Completed replicating batch {}", logPeerId(), entriesHashCode);
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java
index 8755749..5af4086 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java
@@ -109,6 +109,11 @@ public class DummyAsyncClusterConnection implements AsyncClusterConnection {
}
@Override
+ public AsyncReplicationServerAdmin getReplicationServerAdmin(ServerName serverName) {
+ return null;
+ }
+
+ @Override
public NonceGenerator getNonceGenerator() {
return null;
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java
index 4160141..4182eaf 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java
@@ -28,7 +28,8 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
-import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
+import org.apache.hadoop.hbase.client.AsyncReplicationServerAdmin;
+import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint.ReplicationServerSinkPeer;
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint.SinkPeer;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
@@ -99,7 +100,7 @@ public class TestHBaseReplicationEndpoint {
// Sanity check
assertEquals(1, endpoint.getNumSinks());
- SinkPeer sinkPeer = new SinkPeer(serverNameA, mock(AsyncRegionServerAdmin.class));
+ SinkPeer sinkPeer = mockSinkPeer(serverNameA);
endpoint.reportBadSink(sinkPeer);
// Just reporting a bad sink once shouldn't have an effect
assertEquals(1, endpoint.getNumSinks());
@@ -123,7 +124,7 @@ public class TestHBaseReplicationEndpoint {
assertEquals(expected, endpoint.getNumSinks());
ServerName badSinkServer0 = endpoint.getSinkServers().get(0);
- SinkPeer sinkPeer = new SinkPeer(badSinkServer0, mock(AsyncRegionServerAdmin.class));
+ SinkPeer sinkPeer = mockSinkPeer(badSinkServer0);
for (int i = 0; i <= HBaseReplicationEndpoint.DEFAULT_BAD_SINK_THRESHOLD; i++) {
endpoint.reportBadSink(sinkPeer);
}
@@ -133,7 +134,7 @@ public class TestHBaseReplicationEndpoint {
// now try a sink that has some successes
ServerName badSinkServer1 = endpoint.getSinkServers().get(0);
- sinkPeer = new SinkPeer(badSinkServer1, mock(AsyncRegionServerAdmin.class));
+ sinkPeer = mockSinkPeer(badSinkServer1);
for (int i = 0; i < HBaseReplicationEndpoint.DEFAULT_BAD_SINK_THRESHOLD; i++) {
endpoint.reportBadSink(sinkPeer);
}
@@ -168,8 +169,8 @@ public class TestHBaseReplicationEndpoint {
ServerName serverNameA = endpoint.getSinkServers().get(0);
ServerName serverNameB = endpoint.getSinkServers().get(1);
- SinkPeer sinkPeerA = new SinkPeer(serverNameA, mock(AsyncRegionServerAdmin.class));
- SinkPeer sinkPeerB = new SinkPeer(serverNameB, mock(AsyncRegionServerAdmin.class));
+ SinkPeer sinkPeerA = mockSinkPeer(serverNameA);
+ SinkPeer sinkPeerB = mockSinkPeer(serverNameB);
for (int i = 0; i <= HBaseReplicationEndpoint.DEFAULT_BAD_SINK_THRESHOLD; i++) {
endpoint.reportBadSink(sinkPeerA);
@@ -207,4 +208,8 @@ public class TestHBaseReplicationEndpoint {
return null;
}
}
+
+ private SinkPeer mockSinkPeer(ServerName serverName) {
+ return new ReplicationServerSinkPeer(serverName, mock(AsyncReplicationServerAdmin.class));
+ }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java
index 6a0ef3d..0ef23f2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java
@@ -30,14 +30,15 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
-import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
+import org.apache.hadoop.hbase.client.AsyncReplicationServerAdmin;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.master.HMaster;
-import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
+import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint.ReplicationServerSinkPeer;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
@@ -116,22 +117,48 @@ public class TestReplicationServer {
TEST_UTIL.deleteTableIfAny(TABLENAME);
}
+ /**
+ * Requests replication server using {@link AsyncReplicationServerAdmin}
+ */
@Test
public void testReplicateWAL() throws Exception {
- AsyncClusterConnection conn = TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().get(0)
- .getRegionServer().getAsyncClusterConnection();
- AsyncRegionServerAdmin rsAdmin = conn.getRegionServerAdmin(replicationServer.getServerName());
+ AsyncClusterConnection conn =
+ TEST_UTIL.getHBaseCluster().getMaster().getAsyncClusterConnection();
+ AsyncReplicationServerAdmin replAdmin =
+ conn.getReplicationServerAdmin(replicationServer.getServerName());
+
+ ReplicationServerSinkPeer sinkPeer =
+ new ReplicationServerSinkPeer(replicationServer.getServerName(), replAdmin);
+ replicateWALEntryAndVerify(sinkPeer);
+ }
+
+ /**
+ * Requests region server using {@link AsyncReplicationServerAdmin}
+ */
+ @Test
+ public void testReplicateWAL2() throws Exception {
+ AsyncClusterConnection conn =
+ TEST_UTIL.getHBaseCluster().getMaster().getAsyncClusterConnection();
+ ServerName rs = TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().get(0)
+ .getRegionServer().getServerName();
+ AsyncReplicationServerAdmin replAdmin = conn.getReplicationServerAdmin(rs);
+
+ ReplicationServerSinkPeer
+ sinkPeer = new ReplicationServerSinkPeer(rs, replAdmin);
+ replicateWALEntryAndVerify(sinkPeer);
+ }
+ private void replicateWALEntryAndVerify(ReplicationServerSinkPeer sinkPeer) throws Exception {
Entry[] entries = new Entry[BATCH_SIZE];
for(int i = 0; i < BATCH_SIZE; i++) {
entries[i] = generateEdit(i, TABLENAME, Bytes.toBytes(i));
}
- ReplicationProtbufUtil.replicateWALEntry(rsAdmin, entries, replicationClusterId,
- baseNamespaceDir, hfileArchiveDir, 1000);
+ sinkPeer.replicateWALEntry(entries, replicationClusterId, baseNamespaceDir, hfileArchiveDir,
+ 1000);
+ Table table = TEST_UTIL.getConnection().getTable(TABLENAME);
for (int i = 0; i < BATCH_SIZE; i++) {
- Table table = TEST_UTIL.getConnection().getTable(TABLENAME);
Result result = table.get(new Get(Bytes.toBytes(i)));
Cell cell = result.getColumnLatestCell(Bytes.toBytes(FAMILY), Bytes.toBytes(FAMILY));
assertNotNull(cell);