You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by su...@apache.org on 2021/02/23 01:57:47 UTC
[hbase] 07/10: HBASE-24684 Fetch ReplicationSink servers list from HMaster instead o… (#2077)
This is an automated email from the ASF dual-hosted git repository.
sunxin pushed a commit to branch HBASE-24666
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 29adcce81e7783a39597b942e3c78977b2871cb9
Author: XinSun <dd...@gmail.com>
AuthorDate: Sun Sep 20 10:54:43 2020 +0800
HBASE-24684 Fetch ReplicationSink servers list from HMaster instead o… (#2077)
Signed-off-by: Wellington Chevreuil <wc...@apache.org>
---
.../src/main/protobuf/server/master/Master.proto | 12 +-
.../hadoop/hbase/coprocessor/MasterObserver.java | 16 +++
.../org/apache/hadoop/hbase/master/HMaster.java | 5 +
.../hadoop/hbase/master/MasterCoprocessorHost.java | 18 +++
.../hadoop/hbase/master/MasterRpcServices.java | 21 ++++
.../apache/hadoop/hbase/master/MasterServices.java | 6 +
.../replication/HBaseReplicationEndpoint.java | 140 +++++++++++++++++++--
.../hbase/master/MockNoopMasterServices.java | 5 +
.../replication/TestHBaseReplicationEndpoint.java | 5 +
.../replication/TestReplicationFetchServers.java | 106 ++++++++++++++++
.../TestGlobalReplicationThrottler.java | 4 +
...stRegionReplicaReplicationEndpointNoMaster.java | 2 +
12 files changed, 327 insertions(+), 13 deletions(-)
diff --git a/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto b/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto
index 118ce77..7dec566 100644
--- a/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto
@@ -717,6 +717,13 @@ message BalancerDecisionsResponse {
repeated BalancerDecision balancer_decision = 1;
}
+message ListReplicationSinkServersRequest {
+}
+
+message ListReplicationSinkServersResponse {
+ repeated ServerName server_name = 1;
+}
+
service MasterService {
/** Used by the client to get the number of regions that have received the updated schema */
rpc GetSchemaAlterStatus(GetSchemaAlterStatusRequest)
@@ -1146,10 +1153,13 @@ service MasterService {
returns (RenameRSGroupResponse);
rpc UpdateRSGroupConfig(UpdateRSGroupConfigRequest)
- returns (UpdateRSGroupConfigResponse);
+ returns (UpdateRSGroupConfigResponse);
rpc GetLogEntries(LogRequest)
returns(LogEntry);
+
+ rpc ListReplicationSinkServers(ListReplicationSinkServersRequest)
+ returns (ListReplicationSinkServersResponse);
}
// HBCK Service definitions.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
index ac35caa..ec009cc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
@@ -1782,4 +1782,20 @@ public interface MasterObserver {
default void postHasUserPermissions(ObserverContext<MasterCoprocessorEnvironment> ctx,
String userName, List<Permission> permissions) throws IOException {
}
+
+ /**
+ * Called before getting servers for replication sink.
+ * @param ctx the coprocessor instance's environment
+ */
+ default void preListReplicationSinkServers(ObserverContext<MasterCoprocessorEnvironment> ctx)
+ throws IOException {
+ }
+
+ /**
+ * Called after getting servers for replication sink.
+ * @param ctx the coprocessor instance's environment
+ */
+ default void postListReplicationSinkServers(ObserverContext<MasterCoprocessorEnvironment> ctx)
+ throws IOException {
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 74f199c..138a43f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -3782,4 +3782,9 @@ public class HMaster extends HRegionServer implements MasterServices {
public MetaLocationSyncer getMetaLocationSyncer() {
return metaLocationSyncer;
}
+
+ @Override
+ public List<ServerName> listReplicationSinkServers() throws IOException {
+ return this.serverManager.getOnlineServersList();
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
index 01d1a62..f775eba 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
@@ -2038,4 +2038,22 @@ public class MasterCoprocessorHost
}
});
}
+
+ public void preListReplicationSinkServers() throws IOException {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
+ @Override
+ public void call(MasterObserver observer) throws IOException {
+ observer.preListReplicationSinkServers(this);
+ }
+ });
+ }
+
+ public void postListReplicationSinkServers() throws IOException {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
+ @Override
+ public void call(MasterObserver observer) throws IOException {
+ observer.postListReplicationSinkServers(this);
+ }
+ });
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
index 8f2f0da..df5f0b0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
@@ -263,6 +263,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamesp
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListReplicationSinkServersRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListReplicationSinkServersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
@@ -3393,4 +3395,23 @@ public class MasterRpcServices extends RSRpcServices implements
.addAllBalancerDecision(balancerDecisions).build();
}
+ public ListReplicationSinkServersResponse listReplicationSinkServers(
+ RpcController controller, ListReplicationSinkServersRequest request)
+ throws ServiceException {
+ ListReplicationSinkServersResponse.Builder builder =
+ ListReplicationSinkServersResponse.newBuilder();
+ try {
+ if (master.getMasterCoprocessorHost() != null) {
+ master.getMasterCoprocessorHost().preListReplicationSinkServers();
+ }
+ builder.addAllServerName(master.listReplicationSinkServers().stream()
+ .map(ProtobufUtil::toServerName).collect(Collectors.toList()));
+ if (master.getMasterCoprocessorHost() != null) {
+ master.getMasterCoprocessorHost().postListReplicationSinkServers();
+ }
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ return builder.build();
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
index f24ecd4..3f7dc02 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
@@ -577,4 +577,10 @@ public interface MasterServices extends Server {
* We need to get this in MTP to tell the syncer the new meta replica count.
*/
MetaLocationSyncer getMetaLocationSyncer();
+
+ /**
+ * Get a list of servers' addresses for replication sink.
+ * @return a list of servers' address
+ */
+ List<ServerName> listReplicationSinkServers() throws IOException;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
index f38fd08..e788d8c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
@@ -18,6 +18,9 @@
package org.apache.hadoop.hbase.replication;
+import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT;
+import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -27,18 +30,22 @@ import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
import org.apache.hadoop.hbase.client.AsyncReplicationServerAdmin;
import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.wal.WAL;
-import org.apache.hadoop.hbase.zookeeper.ZKListener;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
+import org.apache.hadoop.hbase.zookeeper.ZKListener;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
@@ -50,6 +57,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
+import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListReplicationSinkServersRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListReplicationSinkServersResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
/**
* A {@link BaseReplicationEndpoint} for replication endpoints whose
@@ -61,6 +74,13 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
private static final Logger LOG = LoggerFactory.getLogger(HBaseReplicationEndpoint.class);
+ public static final String FETCH_SERVERS_USE_ZK_CONF_KEY =
+ "hbase.replication.fetch.servers.usezk";
+
+ public static final String FETCH_SERVERS_INTERVAL_CONF_KEY =
+ "hbase.replication.fetch.servers.interval";
+ public static final int DEFAULT_FETCH_SERVERS_INTERVAL = 10 * 60 * 1000; // 10 mins
+
private ZKWatcher zkw = null;
private final Object zkwLock = new Object();
@@ -92,6 +112,11 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
private List<ServerName> sinkServers = new ArrayList<>(0);
+ private AsyncClusterConnection peerConnection;
+ private boolean fetchServersUseZk = false;
+ private FetchServersChore fetchServersChore;
+ private int shortOperationTimeout;
+
/*
* Some implementations of HBaseInterClusterReplicationEndpoint may require instantiate different
* Connection implementations, or initialize it in a different way, so defining createConnection
@@ -127,6 +152,16 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
LOG.warn("{} Failed to close the connection", ctx.getPeerId());
}
}
+ if (fetchServersChore != null) {
+ fetchServersChore.cancel();
+ }
+ if (peerConnection != null) {
+ try {
+ peerConnection.close();
+ } catch (IOException e) {
+ LOG.warn("Attempt to close peerConnection failed.", e);
+ }
+ }
}
/**
@@ -157,8 +192,27 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
}
@Override
- protected void doStart() {
+ protected synchronized void doStart() {
+ this.shortOperationTimeout = ctx.getLocalConfiguration().getInt(
+ HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY, DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);
try {
+ if (ctx.getLocalConfiguration().getBoolean(FETCH_SERVERS_USE_ZK_CONF_KEY, false)) {
+ fetchServersUseZk = true;
+ } else {
+ try {
+ if (ReplicationUtils.isPeerClusterSupportReplicationOffload(getPeerConnection())) {
+ fetchServersChore = new FetchServersChore(ctx.getServer(), this);
+ ctx.getServer().getChoreService().scheduleChore(fetchServersChore);
+ fetchServersUseZk = false;
+ } else {
+ fetchServersUseZk = true;
+ }
+ } catch (Throwable t) {
+ fetchServersUseZk = true;
+ LOG.warn("Peer {} try to fetch servers by admin failed. Using zk impl.",
+ ctx.getPeerId(), t);
+ }
+ }
reloadZkWatcher();
connectPeerCluster();
notifyStarted();
@@ -201,7 +255,9 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
}
zkw = new ZKWatcher(ctx.getConfiguration(),
"connection to cluster: " + ctx.getPeerId(), this);
- zkw.registerListener(new PeerRegionServerListener(this));
+ if (fetchServersUseZk) {
+ zkw.registerListener(new PeerRegionServerListener(this));
+ }
}
}
@@ -227,11 +283,46 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
}
/**
+ * Get the connection to peer cluster
+ * @return connection to peer cluster
+ * @throws IOException If anything goes wrong connecting
+ */
+ private synchronized AsyncClusterConnection getPeerConnection() throws IOException {
+ if (peerConnection == null) {
+ Configuration conf = ctx.getConfiguration();
+ peerConnection = ClusterConnectionFactory.createAsyncClusterConnection(conf, null,
+ UserProvider.instantiate(conf).getCurrent());
+ }
+ return peerConnection;
+ }
+
+ /**
+ * Get the list of all the servers that are responsible for replication sink
+ * from the specified peer master
+ * @return list of server addresses or an empty list if the slave is unavailable
+ */
+ protected List<ServerName> fetchSlavesAddresses() {
+ try {
+ AsyncClusterConnection peerConn = getPeerConnection();
+ ServerName master = FutureUtils.get(peerConn.getAdmin().getMaster());
+ MasterService.BlockingInterface masterStub = MasterService.newBlockingStub(
+ peerConn.getRpcClient()
+ .createBlockingRpcChannel(master, User.getCurrent(), shortOperationTimeout));
+ ListReplicationSinkServersResponse resp = masterStub
+ .listReplicationSinkServers(null, ListReplicationSinkServersRequest.newBuilder().build());
+ return ProtobufUtil.toServerNameList(resp.getServerNameList());
+ } catch (ServiceException | IOException e) {
+ LOG.error("Peer {} fetches servers failed", ctx.getPeerId(), e);
+ }
+ return Collections.emptyList();
+ }
+
+ /**
* Get the list of all the region servers from the specified peer
*
* @return list of region server addresses or an empty list if the slave is unavailable
*/
- protected List<ServerName> fetchSlavesAddresses() {
+ protected List<ServerName> fetchSlavesAddressesByZK() {
List<String> children = null;
try {
synchronized (zkwLock) {
@@ -254,7 +345,12 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
}
protected synchronized void chooseSinks() {
- List<ServerName> slaveAddresses = fetchSlavesAddresses();
+ List<ServerName> slaveAddresses = Collections.emptyList();
+ if (fetchServersUseZk) {
+ slaveAddresses = fetchSlavesAddressesByZK();
+ } else {
+ slaveAddresses = fetchSlavesAddresses();
+ }
if (slaveAddresses.isEmpty()) {
LOG.warn("No sinks available at peer. Will not be able to replicate");
}
@@ -285,6 +381,14 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
return createSinkPeer(serverName);
}
+ private SinkPeer createSinkPeer(ServerName serverName) throws IOException {
+ if (ReplicationUtils.isPeerClusterSupportReplicationOffload(conn)) {
+ return new ReplicationServerSinkPeer(serverName, conn.getReplicationServerAdmin(serverName));
+ } else {
+ return new RegionServerSinkPeer(serverName, conn.getRegionServerAdmin(serverName));
+ }
+ }
+
/**
* Report a {@code SinkPeer} as being bad (i.e. an attempt to replicate to it
* failed). If a single SinkPeer is reported as bad more than
@@ -393,11 +497,23 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
}
}
- private SinkPeer createSinkPeer(ServerName serverName) throws IOException {
- if (ReplicationUtils.isPeerClusterSupportReplicationOffload(conn)) {
- return new ReplicationServerSinkPeer(serverName, conn.getReplicationServerAdmin(serverName));
- } else {
- return new RegionServerSinkPeer(serverName, conn.getRegionServerAdmin(serverName));
+ /**
+ * Chore that will fetch the list of servers from peer master.
+ */
+ public static class FetchServersChore extends ScheduledChore {
+
+ private HBaseReplicationEndpoint endpoint;
+
+ public FetchServersChore(Server server, HBaseReplicationEndpoint endpoint) {
+ super("Peer-" + endpoint.ctx.getPeerId() + "-FetchServersChore", server,
+ server.getConfiguration()
+ .getInt(FETCH_SERVERS_INTERVAL_CONF_KEY, DEFAULT_FETCH_SERVERS_INTERVAL));
+ this.endpoint = endpoint;
+ }
+
+ @Override
+ protected void chore() {
+ endpoint.chooseSinks();
}
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
index 933addf..ed2edb0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
@@ -520,4 +520,9 @@ public class MockNoopMasterServices implements MasterServices {
public MetaLocationSyncer getMetaLocationSyncer() {
return null;
}
+
+ @Override
+ public List<ServerName> listReplicationSinkServers() throws IOException {
+ return null;
+ }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java
index 4182eaf..6765794 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java
@@ -199,6 +199,11 @@ public class TestHBaseReplicationEndpoint {
}
@Override
+ public List<ServerName> fetchSlavesAddressesByZK() {
+ return regionServers;
+ }
+
+ @Override
public boolean replicate(ReplicateContext replicateContext) {
return false;
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationFetchServers.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationFetchServers.java
new file mode 100644
index 0000000..9ceacee
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationFetchServers.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.apache.hadoop.hbase.coprocessor.CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.MasterObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListReplicationSinkServersRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListReplicationSinkServersResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
+
+@Category({ ReplicationTests.class, MediumTests.class })
+public class TestReplicationFetchServers extends TestReplicationBase {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestReplicationFetchServers.class);
+
+ private static AtomicBoolean fetchFlag = new AtomicBoolean(false);
+
+ public static class MyObserver implements MasterCoprocessor, MasterObserver {
+
+ @Override
+ public Optional<MasterObserver> getMasterObserver() {
+ return Optional.of(this);
+ }
+
+ @Override
+ public void postListReplicationSinkServers(ObserverContext<MasterCoprocessorEnvironment> ctx) {
+ fetchFlag.set(true);
+ }
+ }
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ CONF2.set(MASTER_COPROCESSOR_CONF_KEY, MyObserver.class.getName());
+ TestReplicationBase.setUpBeforeClass();
+ }
+
+ @Before
+ public void beforeMethod() {
+ fetchFlag.set(false);
+ }
+
+ @Test
+ public void testMasterListReplicationPeerServers() throws IOException, ServiceException {
+ AsyncClusterConnection conn = UTIL2.getAsyncConnection();
+ ServerName master = UTIL2.getAdmin().getMaster();
+ MasterService.BlockingInterface masterStub = MasterService.newBlockingStub(
+ conn.getRpcClient().createBlockingRpcChannel(master, User.getCurrent(), 1000));
+ ListReplicationSinkServersResponse resp = masterStub.listReplicationSinkServers(
+ null, ListReplicationSinkServersRequest.newBuilder().build());
+ List<ServerName> servers = ProtobufUtil.toServerNameList(resp.getServerNameList());
+ assertFalse(servers.isEmpty());
+ assertTrue(fetchFlag.get());
+ }
+
+ @Test
+ public void testPutData() throws IOException {
+ htable1.put(new Put(row).addColumn(famName, famName, row));
+ UTIL2.waitFor(30000L, () -> !htable2.get(new Get(row)).isEmpty());
+ assertTrue(fetchFlag.get());
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalReplicationThrottler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalReplicationThrottler.java
index f528bda..ef6811e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalReplicationThrottler.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalReplicationThrottler.java
@@ -118,6 +118,10 @@ public class TestGlobalReplicationThrottler {
@AfterClass
public static void tearDownAfterClass() throws Exception {
+ Admin admin1 = utility1.getAdmin();
+ admin1.removeReplicationPeer("peer1");
+ admin1.removeReplicationPeer("peer2");
+ admin1.removeReplicationPeer("peer3");
utility2.shutdownMiniCluster();
utility1.shutdownMiniCluster();
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
index ee1ae5f..c676e30 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
@@ -256,11 +256,13 @@ public class TestRegionReplicaReplicationEndpointNoMaster {
ReplicationEndpoint.Context context = mock(ReplicationEndpoint.Context.class);
when(context.getConfiguration()).thenReturn(HTU.getConfiguration());
+ when(context.getLocalConfiguration()).thenReturn(HTU.getConfiguration());
when(context.getMetrics()).thenReturn(mock(MetricsSource.class));
when(context.getServer()).thenReturn(rs0);
when(context.getTableDescriptors()).thenReturn(rs0.getTableDescriptors());
replicator.init(context);
replicator.startAsync();
+ HTU.waitFor(30000, replicator::isRunning);
//load some data to primary
HTU.loadNumericRows(table, f, 0, 1000);