You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2019/02/03 12:53:22 UTC
[hbase] 04/09: HBASE-21579 Use AsyncClusterConnection for
HBaseInterClusterReplicationEndpoint
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch HBASE-21512
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 57202f10a745063dda0c31b04285423a6ffe3135
Author: zhangduo <zh...@apache.org>
AuthorDate: Tue Jan 1 21:27:14 2019 +0800
HBASE-21579 Use AsyncClusterConnection for HBaseInterClusterReplicationEndpoint
---
.../hbase/client/AsyncRegionServerAdmin.java | 14 +++++---
.../hbase/protobuf/ReplicationProtbufUtil.java | 35 ++++++++++---------
.../HBaseInterClusterReplicationEndpoint.java | 31 +++++++++--------
.../regionserver/ReplicationSinkManager.java | 40 ++++++++--------------
.../hbase/replication/SyncReplicationTestBase.java | 12 +++----
.../regionserver/TestReplicationSinkManager.java | 21 +++++-------
6 files changed, 74 insertions(+), 79 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java
index 9accd89..b9141a9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
+import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.yetus.audience.InterfaceAudience;
@@ -94,9 +95,9 @@ public class AsyncRegionServerAdmin {
void call(AdminService.Interface stub, HBaseRpcController controller, RpcCallback<RESP> done);
}
- private <RESP> CompletableFuture<RESP> call(RpcCall<RESP> rpcCall) {
+ private <RESP> CompletableFuture<RESP> call(RpcCall<RESP> rpcCall, CellScanner cellScanner) {
CompletableFuture<RESP> future = new CompletableFuture<>();
- HBaseRpcController controller = conn.rpcControllerFactory.newController();
+ HBaseRpcController controller = conn.rpcControllerFactory.newController(cellScanner);
try {
rpcCall.call(conn.getAdminStub(server), controller, new RpcCallback<RESP>() {
@@ -115,6 +116,10 @@ public class AsyncRegionServerAdmin {
return future;
}
+ private <RESP> CompletableFuture<RESP> call(RpcCall<RESP> rpcCall) {
+ return call(rpcCall, null);
+ }
+
public CompletableFuture<GetRegionInfoResponse> getRegionInfo(GetRegionInfoRequest request) {
return call((stub, controller, done) -> stub.getRegionInfo(controller, request, done));
}
@@ -154,8 +159,9 @@ public class AsyncRegionServerAdmin {
}
public CompletableFuture<ReplicateWALEntryResponse> replicateWALEntry(
- ReplicateWALEntryRequest request) {
- return call((stub, controller, done) -> stub.replicateWALEntry(controller, request, done));
+ ReplicateWALEntryRequest request, CellScanner cellScanner) {
+ return call((stub, controller, done) -> stub.replicateWALEntry(controller, request, done),
+ cellScanner);
}
public CompletableFuture<ReplicateWALEntryResponse> replay(ReplicateWALEntryRequest request) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
index c1b3911..74fad26 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
@@ -20,51 +20,54 @@ package org.apache.hadoop.hbase.protobuf;
import java.io.IOException;
+import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.ExecutionException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.PrivateCellUtil;
+import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
import org.apache.hadoop.hbase.io.SizedCellScanner;
-import org.apache.hadoop.hbase.ipc.HBaseRpcController;
-import org.apache.hadoop.hbase.ipc.HBaseRpcControllerImpl;
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
@InterfaceAudience.Private
public class ReplicationProtbufUtil {
+
/**
- * A helper to replicate a list of WAL entries using admin protocol.
- * @param admin Admin service
+ * A helper to replicate a list of WAL entries using region server admin
+ * @param admin the region server admin
* @param entries Array of WAL entries to be replicated
* @param replicationClusterId Id which will uniquely identify source cluster FS client
* configurations in the replication configuration directory
* @param sourceBaseNamespaceDir Path to source cluster base namespace directory
* @param sourceHFileArchiveDir Path to the source cluster hfile archive directory
- * @throws java.io.IOException
*/
- public static void replicateWALEntry(final AdminService.BlockingInterface admin,
- final Entry[] entries, String replicationClusterId, Path sourceBaseNamespaceDir,
- Path sourceHFileArchiveDir) throws IOException {
- Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
- buildReplicateWALEntryRequest(entries, null, replicationClusterId, sourceBaseNamespaceDir,
- sourceHFileArchiveDir);
- HBaseRpcController controller = new HBaseRpcControllerImpl(p.getSecond());
+ public static void replicateWALEntry(AsyncRegionServerAdmin admin, Entry[] entries,
+ String replicationClusterId, Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir)
+ throws IOException {
+ Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p = buildReplicateWALEntryRequest(
+ entries, null, replicationClusterId, sourceBaseNamespaceDir, sourceHFileArchiveDir);
try {
- admin.replicateWALEntry(controller, p.getFirst());
- } catch (org.apache.hbase.thirdparty.com.google.protobuf.ServiceException e) {
- throw ProtobufUtil.getServiceException(e);
+ admin.replicateWALEntry(p.getFirst(), p.getSecond()).get();
+ } catch (InterruptedException e) {
+ throw (IOException) new InterruptedIOException().initCause(e);
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ Throwables.propagateIfPossible(cause, IOException.class);
+ throw new IOException(e);
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
index 7db53aa..0359096 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -39,7 +39,6 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -48,13 +47,16 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
+import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
+import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.wal.WAL.Entry;
@@ -65,8 +67,6 @@ import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
-
/**
* A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint}
* implementation for replicating to another HBase cluster.
@@ -85,8 +85,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
private static final long DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER = 2;
- private ClusterConnection conn;
- private Configuration localConf;
+ private AsyncClusterConnection conn;
private Configuration conf;
// How long should we sleep for each retry
private long sleepForRetries;
@@ -117,7 +116,6 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
public void init(Context context) throws IOException {
super.init(context);
this.conf = HBaseConfiguration.create(ctx.getConfiguration());
- this.localConf = HBaseConfiguration.create(ctx.getLocalConfiguration());
decorateConf();
this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier",
@@ -132,12 +130,13 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
// TODO: This connection is replication specific or we should make it particular to
// replication and make replication specific settings such as compression or codec to use
// passing Cells.
- this.conn = (ClusterConnection) ConnectionFactory.createConnection(this.conf);
+ this.conn =
+ ClusterConnectionFactory.createAsyncClusterConnection(conf, null, User.getCurrent());
this.sleepForRetries =
this.conf.getLong("replication.source.sleepforretries", 1000);
this.metrics = context.getMetrics();
// ReplicationQueueInfo parses the peerId out of the znode for us
- this.replicationSinkMgr = new ReplicationSinkManager(conn, ctx.getPeerId(), this, this.conf);
+ this.replicationSinkMgr = new ReplicationSinkManager(conn, this, this.conf);
// per sink thread pool
this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY,
HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT);
@@ -284,9 +283,10 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
}
private void reconnectToPeerCluster() {
- ClusterConnection connection = null;
+ AsyncClusterConnection connection = null;
try {
- connection = (ClusterConnection) ConnectionFactory.createConnection(this.conf);
+ connection =
+ ClusterConnectionFactory.createAsyncClusterConnection(conf, null, User.getCurrent());
} catch (IOException ioe) {
LOG.warn("Failed to create connection for peer cluster", ioe);
}
@@ -367,7 +367,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
}
continue;
}
- if (this.conn == null || this.conn.isClosed()) {
+ if (this.conn == null) {
reconnectToPeerCluster();
}
try {
@@ -480,10 +480,11 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
entriesHashCode, entries.size(), size, replicationClusterId);
}
sinkPeer = replicationSinkMgr.getReplicationSink();
- BlockingInterface rrs = sinkPeer.getRegionServer();
+ AsyncRegionServerAdmin rsAdmin = sinkPeer.getRegionServer();
try {
- ReplicationProtbufUtil.replicateWALEntry(rrs, entries.toArray(new Entry[entries.size()]),
- replicationClusterId, baseNamespaceDir, hfileArchiveDir);
+ ReplicationProtbufUtil.replicateWALEntry(rsAdmin,
+ entries.toArray(new Entry[entries.size()]), replicationClusterId, baseNamespaceDir,
+ hfileArchiveDir);
LOG.trace("Completed replicating batch {}", entriesHashCode);
} catch (IOException e) {
LOG.trace("Failed replicating batch {}", entriesHashCode, e);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java
index 3cd7884..21b07ac 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java
@@ -21,11 +21,11 @@ import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.client.ClusterConnection;
-import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@@ -35,8 +35,6 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
-
/**
* Maintains a collection of peers to replicate to, and randomly selects a
* single peer to replicate to per set of data to replicate. Also handles
@@ -61,9 +59,7 @@ public class ReplicationSinkManager {
static final float DEFAULT_REPLICATION_SOURCE_RATIO = 0.5f;
- private final Connection conn;
-
- private final String peerClusterId;
+ private final AsyncClusterConnection conn;
private final HBaseReplicationEndpoint endpoint;
@@ -77,8 +73,6 @@ public class ReplicationSinkManager {
// replication sinks is refreshed
private final int badSinkThreshold;
- private final Random random;
-
// A timestamp of the last time the list of replication peers changed
private long lastUpdateToPeers;
@@ -88,26 +82,22 @@ public class ReplicationSinkManager {
/**
* Instantiate for a single replication peer cluster.
* @param conn connection to the peer cluster
- * @param peerClusterId identifier of the peer cluster
* @param endpoint replication endpoint for inter cluster replication
* @param conf HBase configuration, used for determining replication source ratio and bad peer
* threshold
*/
- public ReplicationSinkManager(ClusterConnection conn, String peerClusterId,
- HBaseReplicationEndpoint endpoint, Configuration conf) {
+ public ReplicationSinkManager(AsyncClusterConnection conn, HBaseReplicationEndpoint endpoint,
+ Configuration conf) {
this.conn = conn;
- this.peerClusterId = peerClusterId;
this.endpoint = endpoint;
this.badReportCounts = Maps.newHashMap();
this.ratio = conf.getFloat("replication.source.ratio", DEFAULT_REPLICATION_SOURCE_RATIO);
- this.badSinkThreshold = conf.getInt("replication.bad.sink.threshold",
- DEFAULT_BAD_SINK_THRESHOLD);
- this.random = new Random();
+ this.badSinkThreshold =
+ conf.getInt("replication.bad.sink.threshold", DEFAULT_BAD_SINK_THRESHOLD);
}
/**
* Get a randomly-chosen replication sink to replicate to.
- *
* @return a replication sink to replicate to
*/
public synchronized SinkPeer getReplicationSink() throws IOException {
@@ -119,8 +109,8 @@ public class ReplicationSinkManager {
if (sinks.isEmpty()) {
throw new IOException("No replication sinks are available");
}
- ServerName serverName = sinks.get(random.nextInt(sinks.size()));
- return new SinkPeer(serverName, ((ClusterConnection) conn).getAdmin(serverName));
+ ServerName serverName = sinks.get(ThreadLocalRandom.current().nextInt(sinks.size()));
+ return new SinkPeer(serverName, conn.getRegionServerAdmin(serverName));
}
/**
@@ -160,7 +150,7 @@ public class ReplicationSinkManager {
*/
public synchronized void chooseSinks() {
List<ServerName> slaveAddresses = endpoint.getRegionServers();
- Collections.shuffle(slaveAddresses, random);
+ Collections.shuffle(slaveAddresses, ThreadLocalRandom.current());
int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio);
sinks = slaveAddresses.subList(0, numSinks);
lastUpdateToPeers = System.currentTimeMillis();
@@ -182,9 +172,9 @@ public class ReplicationSinkManager {
*/
public static class SinkPeer {
private ServerName serverName;
- private AdminService.BlockingInterface regionServer;
+ private AsyncRegionServerAdmin regionServer;
- public SinkPeer(ServerName serverName, AdminService.BlockingInterface regionServer) {
+ public SinkPeer(ServerName serverName, AsyncRegionServerAdmin regionServer) {
this.serverName = serverName;
this.regionServer = regionServer;
}
@@ -193,10 +183,8 @@ public class ReplicationSinkManager {
return serverName;
}
- public AdminService.BlockingInterface getRegionServer() {
+ public AsyncRegionServerAdmin getRegionServer() {
return regionServer;
}
-
}
-
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
index f373590..e0d112d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
@@ -36,7 +36,7 @@ import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
@@ -250,19 +250,19 @@ public class SyncReplicationTestBase {
protected final void verifyReplicationRequestRejection(HBaseTestingUtility utility,
boolean expectedRejection) throws Exception {
HRegionServer regionServer = utility.getRSForFirstRegionInTable(TABLE_NAME);
- ClusterConnection connection = regionServer.getClusterConnection();
+ AsyncClusterConnection connection = regionServer.getAsyncClusterConnection();
Entry[] entries = new Entry[10];
for (int i = 0; i < entries.length; i++) {
entries[i] =
new Entry(new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TABLE_NAME, 0), new WALEdit());
}
if (!expectedRejection) {
- ReplicationProtbufUtil.replicateWALEntry(connection.getAdmin(regionServer.getServerName()),
- entries, null, null, null);
+ ReplicationProtbufUtil.replicateWALEntry(
+ connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null);
} else {
try {
- ReplicationProtbufUtil.replicateWALEntry(connection.getAdmin(regionServer.getServerName()),
- entries, null, null, null);
+ ReplicationProtbufUtil.replicateWALEntry(
+ connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null);
fail("Should throw IOException when sync-replication state is in A or DA");
} catch (DoNotRetryIOException e) {
assertTrue(e.getMessage().contains("Reject to apply to sink cluster"));
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java
index 39dabb4..60afd40 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java
@@ -25,7 +25,8 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
@@ -37,8 +38,6 @@ import org.junit.experimental.categories.Category;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
-
@Category({ReplicationTests.class, SmallTests.class})
public class TestReplicationSinkManager {
@@ -46,16 +45,14 @@ public class TestReplicationSinkManager {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestReplicationSinkManager.class);
- private static final String PEER_CLUSTER_ID = "PEER_CLUSTER_ID";
-
private HBaseReplicationEndpoint replicationEndpoint;
private ReplicationSinkManager sinkManager;
@Before
public void setUp() {
replicationEndpoint = mock(HBaseReplicationEndpoint.class);
- sinkManager = new ReplicationSinkManager(mock(ClusterConnection.class),
- PEER_CLUSTER_ID, replicationEndpoint, new Configuration());
+ sinkManager = new ReplicationSinkManager(mock(AsyncClusterConnection.class),
+ replicationEndpoint, new Configuration());
}
@Test
@@ -100,7 +97,7 @@ public class TestReplicationSinkManager {
// Sanity check
assertEquals(1, sinkManager.getNumSinks());
- SinkPeer sinkPeer = new SinkPeer(serverNameA, mock(AdminService.BlockingInterface.class));
+ SinkPeer sinkPeer = new SinkPeer(serverNameA, mock(AsyncRegionServerAdmin.class));
sinkManager.reportBadSink(sinkPeer);
@@ -131,7 +128,7 @@ public class TestReplicationSinkManager {
ServerName serverName = sinkManager.getSinksForTesting().get(0);
- SinkPeer sinkPeer = new SinkPeer(serverName, mock(AdminService.BlockingInterface.class));
+ SinkPeer sinkPeer = new SinkPeer(serverName, mock(AsyncRegionServerAdmin.class));
sinkManager.reportSinkSuccess(sinkPeer); // has no effect, counter does not go negative
for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD; i++) {
@@ -147,7 +144,7 @@ public class TestReplicationSinkManager {
//
serverName = sinkManager.getSinksForTesting().get(0);
- sinkPeer = new SinkPeer(serverName, mock(AdminService.BlockingInterface.class));
+ sinkPeer = new SinkPeer(serverName, mock(AsyncRegionServerAdmin.class));
for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD-1; i++) {
sinkManager.reportBadSink(sinkPeer);
}
@@ -188,8 +185,8 @@ public class TestReplicationSinkManager {
ServerName serverNameA = sinkList.get(0);
ServerName serverNameB = sinkList.get(1);
- SinkPeer sinkPeerA = new SinkPeer(serverNameA, mock(AdminService.BlockingInterface.class));
- SinkPeer sinkPeerB = new SinkPeer(serverNameB, mock(AdminService.BlockingInterface.class));
+ SinkPeer sinkPeerA = new SinkPeer(serverNameA, mock(AsyncRegionServerAdmin.class));
+ SinkPeer sinkPeerB = new SinkPeer(serverNameB, mock(AsyncRegionServerAdmin.class));
for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD; i++) {
sinkManager.reportBadSink(sinkPeerA);