You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by su...@apache.org on 2021/02/23 01:57:48 UTC
[hbase] 08/10: HBASE-24999 Master manages ReplicationServers (#2579)
This is an automated email from the ASF dual-hosted git repository.
sunxin pushed a commit to branch HBASE-24666
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit bd13d14bb59e5c4d1c69c19316f4bceb54ef7703
Author: XinSun <dd...@gmail.com>
AuthorDate: Wed Oct 28 18:59:57 2020 +0800
HBASE-24999 Master manages ReplicationServers (#2579)
Signed-off-by: Guanghao Zhang <zg...@apache.org>
---
.../server/master/ReplicationServerStatus.proto | 34 ++++
.../org/apache/hadoop/hbase/master/HMaster.java | 10 +
.../hadoop/hbase/master/MasterRpcServices.java | 37 +++-
.../apache/hadoop/hbase/master/MasterServices.java | 5 +
.../hbase/master/ReplicationServerManager.java | 204 ++++++++++++++++++++
.../replication/HBaseReplicationEndpoint.java | 148 ++++++--------
.../hbase/replication/HReplicationServer.java | 214 ++++++++++++++++++++-
.../HBaseInterClusterReplicationEndpoint.java | 1 -
.../regionserver/ReplicationSyncUp.java | 4 +-
.../hbase/master/MockNoopMasterServices.java | 5 +
.../hbase/replication/TestReplicationBase.java | 2 +
.../hbase/replication/TestReplicationServer.java | 57 +++++-
12 files changed, 619 insertions(+), 102 deletions(-)
diff --git a/hbase-protocol-shaded/src/main/protobuf/server/master/ReplicationServerStatus.proto b/hbase-protocol-shaded/src/main/protobuf/server/master/ReplicationServerStatus.proto
new file mode 100644
index 0000000..d39a043
--- /dev/null
+++ b/hbase-protocol-shaded/src/main/protobuf/server/master/ReplicationServerStatus.proto
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+syntax = "proto2";
+
+package hbase.pb;
+
+option java_package = "org.apache.hadoop.hbase.shaded.protobuf.generated";
+option java_outer_classname = "ReplicationServerStatusProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+option optimize_for = SPEED;
+
+import "server/master/RegionServerStatus.proto";
+
+service ReplicationServerStatusService {
+
+ rpc ReplicationServerReport(RegionServerReportRequest)
+ returns(RegionServerReportResponse);
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 138a43f..5e8de56 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -303,6 +303,8 @@ public class HMaster extends HRegionServer implements MasterServices {
// manager of assignment nodes in zookeeper
private AssignmentManager assignmentManager;
+ // server manager to deal with replication server info
+ private ReplicationServerManager replicationServerManager;
/**
* Cache for the meta region replica's locations. Also tracks their changes to avoid stale
@@ -866,6 +868,8 @@ public class HMaster extends HRegionServer implements MasterServices {
.collect(Collectors.toList());
this.assignmentManager.setupRIT(ritList);
+ this.replicationServerManager = new ReplicationServerManager(this);
+
// Start RegionServerTracker with listing of servers found with exiting SCPs -- these should
// be registered in the deadServers set -- and with the list of servernames out on the
// filesystem that COULD BE 'alive' (we'll schedule SCPs for each and let SCP figure it out).
@@ -1024,6 +1028,7 @@ public class HMaster extends HRegionServer implements MasterServices {
this.hbckChore = new HbckChore(this);
getChoreService().scheduleChore(hbckChore);
this.serverManager.startChore();
+ this.replicationServerManager.startChore();
// Only for rolling upgrade, where we need to migrate the data in namespace table to meta table.
if (!waitForNamespaceOnline()) {
@@ -1283,6 +1288,11 @@ public class HMaster extends HRegionServer implements MasterServices {
}
@Override
+ public ReplicationServerManager getReplicationServerManager() {
+ return this.replicationServerManager;
+ }
+
+ @Override
public MasterFileSystem getMasterFileSystem() {
return this.fileSystemManager;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
index df5f0b0..93f368f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
@@ -401,6 +401,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.Trans
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerStatusProtos.ReplicationServerStatusService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
import org.apache.hadoop.hbase.shaded.protobuf.generated.VisibilityLabelsProtos.VisibilityLabelsService;
@@ -412,7 +413,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.VisibilityLabelsProtos.
public class MasterRpcServices extends RSRpcServices implements
MasterService.BlockingInterface, RegionServerStatusService.BlockingInterface,
LockService.BlockingInterface, HbckService.BlockingInterface,
- ClientMetaService.BlockingInterface {
+ ClientMetaService.BlockingInterface, ReplicationServerStatusService.BlockingInterface {
private static final Logger LOG = LoggerFactory.getLogger(MasterRpcServices.class.getName());
private static final Logger AUDITLOG =
@@ -546,7 +547,7 @@ public class MasterRpcServices extends RSRpcServices implements
*/
@Override
protected List<BlockingServiceAndInterface> getServices() {
- List<BlockingServiceAndInterface> bssi = new ArrayList<>(5);
+ List<BlockingServiceAndInterface> bssi = new ArrayList<>(6);
bssi.add(new BlockingServiceAndInterface(
MasterService.newReflectiveBlockingService(this),
MasterService.BlockingInterface.class));
@@ -559,6 +560,9 @@ public class MasterRpcServices extends RSRpcServices implements
HbckService.BlockingInterface.class));
bssi.add(new BlockingServiceAndInterface(ClientMetaService.newReflectiveBlockingService(this),
ClientMetaService.BlockingInterface.class));
+ bssi.add(new BlockingServiceAndInterface(
+ ReplicationServerStatusService.newReflectiveBlockingService(this),
+ ReplicationServerStatusService.BlockingInterface.class));
bssi.addAll(super.getServices());
return bssi;
}
@@ -3414,4 +3418,33 @@ public class MasterRpcServices extends RSRpcServices implements
}
return builder.build();
}
+
+ @Override
+ public RegionServerReportResponse replicationServerReport(RpcController controller,
+ RegionServerReportRequest request) throws ServiceException {
+ try {
+ master.checkServiceStarted();
+ int versionNumber = 0;
+ String version = "0.0.0";
+ VersionInfo versionInfo = VersionInfoUtil.getCurrentClientVersionInfo();
+ if (versionInfo != null) {
+ version = versionInfo.getVersion();
+ versionNumber = VersionInfoUtil.getVersionNumber(versionInfo);
+ }
+ ClusterStatusProtos.ServerLoad sl = request.getLoad();
+ ServerName serverName = ProtobufUtil.toServerName(request.getServer());
+ ServerMetrics oldMetrics = master.getReplicationServerManager().getServerMetrics(serverName);
+ ServerMetrics newMetrics =
+ ServerMetricsBuilder.toServerMetrics(serverName, versionNumber, version, sl);
+ master.getReplicationServerManager().serverReport(serverName, newMetrics);
+ if (sl != null && master.metricsMaster != null) {
+ // Up our metrics.
+ master.metricsMaster.incrementRequests(sl.getTotalNumberOfRequests()
+ - (oldMetrics != null ? oldMetrics.getRequestCount() : 0));
+ }
+ } catch (IOException ioe) {
+ throw new ServiceException(ioe);
+ }
+ return RegionServerReportResponse.newBuilder().build();
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
index 3f7dc02..bb8fdca 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
@@ -103,6 +103,11 @@ public interface MasterServices extends Server {
ServerManager getServerManager();
/**
+ * @return Master's {@link ReplicationServerManager} instance.
+ */
+ ReplicationServerManager getReplicationServerManager();
+
+ /**
* @return Master's instance of {@link ExecutorService}
*/
ExecutorService getExecutorService();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ReplicationServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ReplicationServerManager.java
new file mode 100644
index 0000000..273b7f2
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ReplicationServerManager.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.ServerMetrics;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The ReplicationServerManager class manages info about replication servers.
+ * <p>
+ * Maintains lists of online and dead servers.
+ * <p>
+ * Servers are distinguished in two different ways. A given server has a
+ * location, specified by hostname and port, and of which there can only be one
+ * online at any given time. A server instance is specified by the location
+ * (hostname and port) as well as the startcode (timestamp from when the server
+ * was started). This is used to differentiate a restarted instance of a given
+ * server from the original instance.
+ */
+@InterfaceAudience.Private
+public class ReplicationServerManager {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ReplicationServerManager.class);
+
+ public static final String ONLINE_SERVER_REFRESH_INTERVAL =
+ "hbase.master.replication.server.refresh.interval";
+ public static final int ONLINE_SERVER_REFRESH_INTERVAL_DEFAULT = 60 * 1000; // 1 mins
+
+ private final MasterServices master;
+
+ /** Map of registered servers to their current load */
+ private final ConcurrentNavigableMap<ServerName, ServerMetrics> onlineServers =
+ new ConcurrentSkipListMap<>();
+
+ private OnlineServerRefresher onlineServerRefresher;
+ private int refreshPeriod;
+
+ /**
+ * Constructor.
+ */
+ public ReplicationServerManager(final MasterServices master) {
+ this.master = master;
+ }
+
+ /**
+ * start chore in ServerManager
+ */
+ public void startChore() {
+ Configuration conf = master.getConfiguration();
+ refreshPeriod = conf.getInt(ONLINE_SERVER_REFRESH_INTERVAL,
+ ONLINE_SERVER_REFRESH_INTERVAL_DEFAULT);
+ onlineServerRefresher = new OnlineServerRefresher("ReplicationServerRefresher", refreshPeriod);
+ master.getChoreService().scheduleChore(onlineServerRefresher);
+ }
+
+ /**
+ * Stop the ServerManager.
+ */
+ public void stop() {
+ if (onlineServerRefresher != null) {
+ onlineServerRefresher.cancel();
+ }
+ }
+
+ public void serverReport(ServerName sn, ServerMetrics sl) {
+ if (null == this.onlineServers.replace(sn, sl)) {
+ if (!checkAndRecordNewServer(sn, sl)) {
+ LOG.info("ReplicationServerReport ignored, could not record the server: {}", sn);
+ }
+ }
+ }
+
+ /**
+ * Check is a server of same host and port already exists,
+ * if not, or the existed one got a smaller start code, record it.
+ *
+ * @param serverName the server to check and record
+ * @param sl the server load on the server
+ * @return true if the server is recorded, otherwise, false
+ */
+ private boolean checkAndRecordNewServer(final ServerName serverName, final ServerMetrics sl) {
+ ServerName existingServer = null;
+ synchronized (this.onlineServers) {
+ existingServer = findServerWithSameHostnamePort(serverName);
+ if (existingServer != null && (existingServer.getStartcode() > serverName.getStartcode())) {
+ LOG.info("ReplicationServer serverName={} rejected; we already have {} registered with "
+ + "same hostname and port", serverName, existingServer);
+ return false;
+ }
+ recordNewServer(serverName, sl);
+ // Note that we assume that same ts means same server, and don't expire in that case.
+ if (existingServer != null && (existingServer.getStartcode() < serverName.getStartcode())) {
+ LOG.info("Triggering server recovery; existingServer {} looks stale, new server: {}",
+ existingServer, serverName);
+ expireServer(existingServer);
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Assumes onlineServers is locked.
+ * @return ServerName with matching hostname and port.
+ */
+ private ServerName findServerWithSameHostnamePort(final ServerName serverName) {
+ ServerName end = ServerName.valueOf(serverName.getHostname(), serverName.getPort(),
+ Long.MAX_VALUE);
+
+ ServerName r = onlineServers.lowerKey(end);
+ if (r != null && ServerName.isSameAddress(r, serverName)) {
+ return r;
+ }
+ return null;
+ }
+
+ /**
+ * Assumes onlineServers is locked.
+ */
+ private void recordNewServer(final ServerName serverName, final ServerMetrics sl) {
+ LOG.info("Registering ReplicationServer={}", serverName);
+ this.onlineServers.put(serverName, sl);
+ }
+
+ /**
+ * Assumes onlineServers is locked.
+ * Expire the passed server. Remove it from list of online servers
+ */
+ public void expireServer(final ServerName serverName) {
+ LOG.info("Expiring ReplicationServer={}", serverName);
+ onlineServers.remove(serverName);
+ }
+
+ /**
+ * @return Read-only map of servers to serverinfo
+ */
+ public Map<ServerName, ServerMetrics> getOnlineServers() {
+ // Presumption is that iterating the returned Map is OK.
+ synchronized (this.onlineServers) {
+ return Collections.unmodifiableMap(this.onlineServers);
+ }
+ }
+
+ /**
+ * @return A copy of the internal list of online servers.
+ */
+ public List<ServerName> getOnlineServersList() {
+ return new ArrayList<>(this.onlineServers.keySet());
+ }
+
+ /**
+ * @param serverName server name
+ * @return ServerMetrics if serverName is known else null
+ */
+ public ServerMetrics getServerMetrics(final ServerName serverName) {
+ return this.onlineServers.get(serverName);
+ }
+
+ private class OnlineServerRefresher extends ScheduledChore {
+
+ public OnlineServerRefresher(String name, int p) {
+ super(name, master, p, 60 * 1000); // delay one minute before first execute
+ }
+
+ @Override
+ protected void chore() {
+ synchronized (onlineServers) {
+ List<ServerName> servers = getOnlineServersList();
+ servers.forEach(s -> {
+ ServerMetrics metrics = onlineServers.get(s);
+ if (metrics.getReportTimestamp() + refreshPeriod < System.currentTimeMillis()) {
+ expireServer(s);
+ }
+ });
+ }
+ }
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
index e788d8c..115df76 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
@@ -18,8 +18,8 @@
package org.apache.hadoop.hbase.replication;
-import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT;
-import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY;
+import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT;
+import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_OPERATION_TIMEOUT;
import java.io.IOException;
import java.util.ArrayList;
@@ -32,16 +32,15 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
import org.apache.hadoop.hbase.client.AsyncReplicationServerAdmin;
import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil;
-import org.apache.hadoop.hbase.ScheduledChore;
-import org.apache.hadoop.hbase.Server;
-import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
@@ -67,6 +66,13 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterServ
/**
* A {@link BaseReplicationEndpoint} for replication endpoints whose
* target cluster is an HBase cluster.
+ * <p>
+ * Compatible with two implementations to fetch sink servers, fetching replication servers by
+ * accessing master and fetching region servers by listening to ZK.
+ * Give priority to fetch replication servers as sink servers by accessing master. if slave cluster
+ * isn't supported(version < 3.x) or exceptions occur, fetch region servers as sink servers via ZK.
+ * So we always register ZK listener, but ignored the ZK event if replication servers are available.
+ * </p>
*/
@InterfaceAudience.Private
public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
@@ -74,9 +80,6 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
private static final Logger LOG = LoggerFactory.getLogger(HBaseReplicationEndpoint.class);
- public static final String FETCH_SERVERS_USE_ZK_CONF_KEY =
- "hbase.replication.fetch.servers.usezk";
-
public static final String FETCH_SERVERS_INTERVAL_CONF_KEY =
"hbase.replication.fetch.servers.interval";
public static final int DEFAULT_FETCH_SERVERS_INTERVAL = 10 * 60 * 1000; // 10 mins
@@ -112,10 +115,9 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
private List<ServerName> sinkServers = new ArrayList<>(0);
- private AsyncClusterConnection peerConnection;
- private boolean fetchServersUseZk = false;
+ private volatile boolean fetchServersUseZk = false;
private FetchServersChore fetchServersChore;
- private int shortOperationTimeout;
+ private int operationTimeout;
/*
* Some implementations of HBaseInterClusterReplicationEndpoint may require instantiate different
@@ -136,6 +138,8 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
this.badSinkThreshold =
ctx.getConfiguration().getInt("replication.bad.sink.threshold", DEFAULT_BAD_SINK_THRESHOLD);
this.badReportCounts = Maps.newHashMap();
+ this.operationTimeout = ctx.getLocalConfiguration().getInt(
+ HBASE_CLIENT_OPERATION_TIMEOUT, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
}
protected void disconnect() {
@@ -144,20 +148,12 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
zkw.close();
}
}
- if (this.conn != null) {
- try {
- this.conn.close();
- this.conn = null;
- } catch (IOException e) {
- LOG.warn("{} Failed to close the connection", ctx.getPeerId());
- }
- }
if (fetchServersChore != null) {
fetchServersChore.cancel();
}
- if (peerConnection != null) {
+ if (conn != null) {
try {
- peerConnection.close();
+ conn.close();
} catch (IOException e) {
LOG.warn("Attempt to close peerConnection failed.", e);
}
@@ -192,27 +188,10 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
}
@Override
- protected synchronized void doStart() {
- this.shortOperationTimeout = ctx.getLocalConfiguration().getInt(
- HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY, DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);
+ protected void doStart() {
try {
- if (ctx.getLocalConfiguration().getBoolean(FETCH_SERVERS_USE_ZK_CONF_KEY, false)) {
- fetchServersUseZk = true;
- } else {
- try {
- if (ReplicationUtils.isPeerClusterSupportReplicationOffload(getPeerConnection())) {
- fetchServersChore = new FetchServersChore(ctx.getServer(), this);
- ctx.getServer().getChoreService().scheduleChore(fetchServersChore);
- fetchServersUseZk = false;
- } else {
- fetchServersUseZk = true;
- }
- } catch (Throwable t) {
- fetchServersUseZk = true;
- LOG.warn("Peer {} try to fetch servers by admin failed. Using zk impl.",
- ctx.getPeerId(), t);
- }
- }
+ fetchServersChore = new FetchServersChore(ctx.getServer(), this);
+ ctx.getServer().getChoreService().scheduleChore(fetchServersChore);
reloadZkWatcher();
connectPeerCluster();
notifyStarted();
@@ -255,9 +234,7 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
}
zkw = new ZKWatcher(ctx.getConfiguration(),
"connection to cluster: " + ctx.getPeerId(), this);
- if (fetchServersUseZk) {
- zkw.registerListener(new PeerRegionServerListener(this));
- }
+ zkw.registerListener(new PeerRegionServerListener(this));
}
}
@@ -283,38 +260,25 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
}
/**
- * Get the connection to peer cluster
- * @return connection to peer cluster
- * @throws IOException If anything goes wrong connecting
- */
- private synchronized AsyncClusterConnection getPeerConnection() throws IOException {
- if (peerConnection == null) {
- Configuration conf = ctx.getConfiguration();
- peerConnection = ClusterConnectionFactory.createAsyncClusterConnection(conf, null,
- UserProvider.instantiate(conf).getCurrent());
- }
- return peerConnection;
- }
-
- /**
* Get the list of all the servers that are responsible for replication sink
* from the specified peer master
- * @return list of server addresses or an empty list if the slave is unavailable
+ * @return list of server addresses
*/
- protected List<ServerName> fetchSlavesAddresses() {
+ protected List<ServerName> fetchSlavesAddresses() throws IOException {
try {
- AsyncClusterConnection peerConn = getPeerConnection();
- ServerName master = FutureUtils.get(peerConn.getAdmin().getMaster());
+ ServerName master = FutureUtils.get(conn.getAdmin().getMaster());
MasterService.BlockingInterface masterStub = MasterService.newBlockingStub(
- peerConn.getRpcClient()
- .createBlockingRpcChannel(master, User.getCurrent(), shortOperationTimeout));
+ conn.getRpcClient().createBlockingRpcChannel(master, User.getCurrent(), operationTimeout));
ListReplicationSinkServersResponse resp = masterStub
.listReplicationSinkServers(null, ListReplicationSinkServersRequest.newBuilder().build());
return ProtobufUtil.toServerNameList(resp.getServerNameList());
- } catch (ServiceException | IOException e) {
+ } catch (ServiceException e) {
LOG.error("Peer {} fetches servers failed", ctx.getPeerId(), e);
+ throw ProtobufUtil.getRemoteException(e);
+ } catch (IOException e) {
+ LOG.error("Peer {} fetches servers failed", ctx.getPeerId(), e);
+ throw e;
}
- return Collections.emptyList();
}
/**
@@ -344,20 +308,34 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
return addresses;
}
- protected synchronized void chooseSinks() {
- List<ServerName> slaveAddresses = Collections.emptyList();
- if (fetchServersUseZk) {
+ protected void chooseSinks() {
+ List<ServerName> slaveAddresses = Collections.EMPTY_LIST;
+ boolean useZk = fetchServersUseZk;
+ try {
+ if (!useZk || ReplicationUtils.isPeerClusterSupportReplicationOffload(conn)) {
+ useZk = false;
+ slaveAddresses = fetchSlavesAddresses();
+ } else {
+ useZk = true;
+ }
+ } catch (Throwable t) {
+ LOG.warn("Peer {} try to fetch servers by admin failed. Using zk impl.", ctx.getPeerId(), t);
+ useZk = true;
+ }
+ if (useZk) {
slaveAddresses = fetchSlavesAddressesByZK();
- } else {
- slaveAddresses = fetchSlavesAddresses();
}
+
if (slaveAddresses.isEmpty()) {
LOG.warn("No sinks available at peer. Will not be able to replicate");
}
Collections.shuffle(slaveAddresses, ThreadLocalRandom.current());
int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio);
- this.sinkServers = slaveAddresses.subList(0, numSinks);
- badReportCounts.clear();
+ synchronized (this) {
+ this.fetchServersUseZk = useZk;
+ this.sinkServers = slaveAddresses.subList(0, numSinks);
+ badReportCounts.clear();
+ }
}
protected synchronized int getNumSinks() {
@@ -368,16 +346,18 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
* Get a randomly-chosen replication sink to replicate to.
* @return a replication sink to replicate to
*/
- protected synchronized SinkPeer getReplicationSink() throws IOException {
- if (sinkServers.isEmpty()) {
- LOG.info("Current list of sinks is out of date or empty, updating");
- chooseSinks();
- }
- if (sinkServers.isEmpty()) {
- throw new IOException("No replication sinks are available");
+ protected SinkPeer getReplicationSink() throws IOException {
+ ServerName serverName;
+ synchronized (this) {
+ if (sinkServers.isEmpty()) {
+ LOG.info("Current list of sinks is out of date or empty, updating");
+ chooseSinks();
+ }
+ if (sinkServers.isEmpty()) {
+ throw new IOException("No replication sinks are available");
+ }
+ serverName = sinkServers.get(ThreadLocalRandom.current().nextInt(sinkServers.size()));
}
- ServerName serverName =
- sinkServers.get(ThreadLocalRandom.current().nextInt(sinkServers.size()));
return createSinkPeer(serverName);
}
@@ -438,7 +418,7 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
@Override
public synchronized void nodeChildrenChanged(String path) {
- if (path.equals(regionServerListNode)) {
+ if (replicationEndpoint.fetchServersUseZk && path.equals(regionServerListNode)) {
LOG.info("Detected change to peer region servers, fetching updated list");
replicationEndpoint.chooseSinks();
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
index 31dec0c..4b53bb7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.replication;
import java.io.IOException;
+import java.lang.management.MemoryUsage;
import java.net.InetSocketAddress;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -28,22 +29,37 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.YouAreDeadException;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
+import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.regionserver.ReplicationService;
import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Sleeper;
+import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
+import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerStatusProtos.ReplicationServerStatusService;
+
/**
* HReplicationServer which is responsible to all replication stuff. It checks in with
* the HMaster. There are many HReplicationServers in a single HBase deployment.
@@ -86,9 +102,14 @@ public class HReplicationServer extends Thread implements Server {
// A sleeper that sleeps for msgInterval.
protected final Sleeper sleeper;
+ private final int shortOperationTimeout;
+
// zookeeper connection and watcher
protected final ZKWatcher zooKeeper;
+ // master address tracker
+ private final MasterAddressTracker masterAddressTracker;
+
/**
* The asynchronous cluster connection to be shared by services.
*/
@@ -98,6 +119,17 @@ public class HReplicationServer extends Thread implements Server {
protected final ReplicationServerRpcServices rpcServices;
+ // Stub to do region server status calls against the master.
+ private volatile ReplicationServerStatusService.BlockingInterface rssStub;
+
+ // RPC client. Used to make the stub above that does region server status checking.
+ private RpcClient rpcClient;
+
+ /**
+ * ChoreService used to schedule tasks that we want to run periodically
+ */
+ private ChoreService choreService;
+
public HReplicationServer(final Configuration conf) throws IOException {
TraceUtil.initTracer(conf);
try {
@@ -116,16 +148,22 @@ public class HReplicationServer extends Thread implements Server {
this.msgInterval = conf.getInt("hbase.replicationserver.msginterval", 3 * 1000);
this.sleeper = new Sleeper(this.msgInterval, this);
+ this.shortOperationTimeout = conf.getInt(HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY,
+ HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);
+
// Some unit tests don't need a cluster, so no zookeeper at all
if (!conf.getBoolean("hbase.testing.nocluster", false)) {
// Open connection to zookeeper and set primary watcher
zooKeeper = new ZKWatcher(conf, getProcessName() + ":" +
rpcServices.isa.getPort(), this, false);
+ masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this);
+ masterAddressTracker.start();
} else {
zooKeeper = null;
+ masterAddressTracker = null;
}
-
this.rpcServices.start(zooKeeper);
+ this.choreService = new ChoreService(getName(), true);
} catch (Throwable t) {
// Make sure we log the exception. HReplicationServer is often started via reflection and the
// cause of failed startup is lost.
@@ -150,6 +188,7 @@ public class HReplicationServer extends Thread implements Server {
} catch (Throwable e) {
abort("Fatal exception during initialization", e);
}
+
try {
setupReplication();
startReplicationService();
@@ -161,6 +200,7 @@ public class HReplicationServer extends Thread implements Server {
while (!isStopped()) {
long now = System.currentTimeMillis();
if ((now - lastMsg) >= msgInterval) {
+ tryReplicationServerReport(lastMsg, now);
lastMsg = System.currentTimeMillis();
}
if (!isStopped() && !isAborted()) {
@@ -177,6 +217,22 @@ public class HReplicationServer extends Thread implements Server {
abort(t.getMessage(), t);
}
+ if (this.asyncClusterConnection != null) {
+ try {
+ this.asyncClusterConnection.close();
+ } catch (IOException e) {
+ // Although the {@link Closeable} interface throws an {@link
+ // IOException}, in reality, the implementation would never do that.
+ LOG.warn("Attempt to close server's AsyncClusterConnection failed.", e);
+ }
+ }
+ if (rssStub != null) {
+ rssStub = null;
+ }
+ if (rpcClient != null) {
+ this.rpcClient.close();
+ }
+
if (this.zooKeeper != null) {
this.zooKeeper.close();
}
@@ -204,11 +260,13 @@ public class HReplicationServer extends Thread implements Server {
private void preRegistrationInitialization() {
try {
setupClusterConnection();
+ // Setup RPC client for master communication
+ this.rpcClient = asyncClusterConnection.getRpcClient();
} catch (Throwable t) {
// Call stop if error or process will stick around for ever since server
// puts up non-daemon threads.
this.rpcServices.stop();
- abort("Initialization of RS failed. Hence aborting RS.", t);
+ abort("Initialization of ReplicationServer failed. Hence aborting ReplicationServer.", t);
}
}
@@ -272,7 +330,7 @@ public class HReplicationServer extends Thread implements Server {
@Override
public ChoreService getChoreService() {
- return null;
+ return this.choreService;
}
@Override
@@ -329,7 +387,7 @@ public class HReplicationServer extends Thread implements Server {
throws IOException {
// read in the name of the sink replication class from the config file.
String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME,
- HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
+ HConstants.REPLICATION_SINK_SERVICE_CLASSNAME_DEFAULT);
server.replicationSinkService = newReplicationInstance(sinkClassname,
ReplicationSinkService.class, conf, server);
@@ -388,4 +446,152 @@ public class HReplicationServer extends Thread implements Server {
protected boolean setAbortRequested() {
return abortRequested.compareAndSet(false, true);
}
+
+ private void tryReplicationServerReport(long reportStartTime, long reportEndTime)
+ throws IOException {
+ ReplicationServerStatusService.BlockingInterface rss = rssStub;
+ if (rss == null) {
+ ServerName masterServerName = createReplicationServerStatusStub(true);
+ rss = rssStub;
+ if (masterServerName == null || rss == null) {
+ return;
+ }
+ }
+ ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime);
+ try {
+ RegionServerReportRequest.Builder request = RegionServerReportRequest
+ .newBuilder();
+ request.setServer(ProtobufUtil.toServerName(this.serverName));
+ request.setLoad(sl);
+ rss.replicationServerReport(null, request.build());
+ } catch (ServiceException se) {
+ IOException ioe = ProtobufUtil.getRemoteException(se);
+ if (ioe instanceof YouAreDeadException) {
+ // This will be caught and handled as a fatal error in run()
+ throw ioe;
+ }
+ if (rssStub == rss) {
+ rssStub = null;
+ }
+ // Couldn't connect to the master, get location from zk and reconnect
+ // Method blocks until new master is found or we are stopped
+ createReplicationServerStatusStub(true);
+ }
+ }
+
+ private ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime) {
+ long usedMemory = -1L;
+ long maxMemory = -1L;
+ final MemoryUsage usage = MemorySizeUtil.safeGetHeapMemoryUsage();
+ if (usage != null) {
+ usedMemory = usage.getUsed();
+ maxMemory = usage.getMax();
+ }
+
+ ClusterStatusProtos.ServerLoad.Builder serverLoad = ClusterStatusProtos.ServerLoad.newBuilder();
+ serverLoad.setTotalNumberOfRequests(rpcServices.requestCount.sum());
+ serverLoad.setUsedHeapMB((int) (usedMemory / 1024 / 1024));
+ serverLoad.setMaxHeapMB((int) (maxMemory / 1024 / 1024));
+
+ serverLoad.setReportStartTime(reportStartTime);
+ serverLoad.setReportEndTime(reportEndTime);
+
+ // for the replicationLoad purpose. Only need to get from one executorService
+ // either source or sink will get the same info
+ ReplicationSinkService sinks = getReplicationSinkService();
+
+ if (sinks != null) {
+ // always refresh first to get the latest value
+ ReplicationLoad rLoad = sinks.refreshAndGetReplicationLoad();
+ if (rLoad != null) {
+ serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink());
+ }
+ }
+ return serverLoad.build();
+ }
+
+ /**
+ * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh
+ * connection, the current rssStub must be null. Method will block until a master is available.
+ * You can break from this block by requesting the server stop.
+ * @param refresh If true then master address will be read from ZK, otherwise use cached data
+ * @return master + port, or null if server has been stopped
+ */
+ private synchronized ServerName createReplicationServerStatusStub(boolean refresh) {
+ if (rssStub != null) {
+ return masterAddressTracker.getMasterAddress();
+ }
+ ServerName sn = null;
+ long previousLogTime = 0;
+ ReplicationServerStatusService.BlockingInterface intRssStub = null;
+ boolean interrupted = false;
+ try {
+ while (keepLooping()) {
+ sn = this.masterAddressTracker.getMasterAddress(refresh);
+ if (sn == null) {
+ if (!keepLooping()) {
+ // give up with no connection.
+ LOG.debug("No master found and cluster is stopped; bailing out");
+ return null;
+ }
+ if (System.currentTimeMillis() > (previousLogTime + 1000)) {
+ LOG.debug("No master found; retry");
+ previousLogTime = System.currentTimeMillis();
+ }
+ refresh = true; // let's try pull it from ZK directly
+ if (sleepInterrupted(200)) {
+ interrupted = true;
+ }
+ continue;
+ }
+
+ try {
+ BlockingRpcChannel channel =
+ this.rpcClient.createBlockingRpcChannel(sn, userProvider.getCurrent(),
+ shortOperationTimeout);
+ intRssStub = ReplicationServerStatusService.newBlockingStub(channel);
+ break;
+ } catch (IOException e) {
+ if (System.currentTimeMillis() > (previousLogTime + 1000)) {
+ e = e instanceof RemoteException ?
+ ((RemoteException)e).unwrapRemoteException() : e;
+ if (e instanceof ServerNotRunningYetException) {
+ LOG.info("Master isn't available yet, retrying");
+ } else {
+ LOG.warn("Unable to connect to master. Retrying. Error was:", e);
+ }
+ previousLogTime = System.currentTimeMillis();
+ }
+ if (sleepInterrupted(200)) {
+ interrupted = true;
+ }
+ }
+ }
+ } finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ this.rssStub = intRssStub;
+ return sn;
+ }
+
+ /**
+ * @return True if we should break loop because cluster is going down or
+ * this server has been stopped or hdfs has gone bad.
+ */
+ private boolean keepLooping() {
+ return !this.stopped;
+ }
+
+ private static boolean sleepInterrupted(long millis) {
+ boolean interrupted = false;
+ try {
+ Thread.sleep(millis);
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while sleeping");
+ interrupted = true;
+ }
+ return interrupted;
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
index d8517b0..f0448b1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -47,7 +47,6 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.ipc.RpcServer;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
index a43be29..55b20c6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
@@ -104,11 +104,13 @@ public class ReplicationSyncUp extends Configured implements Tool {
class DummyServer implements Server {
String hostname;
ZKWatcher zkw;
+ ChoreService choreService;
DummyServer(ZKWatcher zkw) {
// a unique name in case the first run fails
hostname = System.currentTimeMillis() + ".SyncUpTool.replication.org";
this.zkw = zkw;
+ this.choreService = new ChoreService("ReplicationSyncUpDummyServer", true);
}
DummyServer(String hostname) {
@@ -160,7 +162,7 @@ public class ReplicationSyncUp extends Configured implements Tool {
@Override
public ChoreService getChoreService() {
- return null;
+ return choreService;
}
@Override
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
index ed2edb0..52ca278 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
@@ -154,6 +154,11 @@ public class MockNoopMasterServices implements MasterServices {
}
@Override
+ public ReplicationServerManager getReplicationServerManager() {
+ return null;
+ }
+
+ @Override
public ZKWatcher getZooKeeper() {
return null;
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
index eca0d67..78fec0e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.replication;
+import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_OPERATION_TIMEOUT;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
@@ -198,6 +199,7 @@ public class TestReplicationBase {
conf.setFloat("replication.source.ratio", 1.0f);
conf.setBoolean("replication.source.eof.autorecovery", true);
conf.setLong("hbase.serial.replication.waiting.ms", 100);
+ conf.setLong(HBASE_CLIENT_OPERATION_TIMEOUT, 5000);
}
static void configureClusters(HBaseTestingUtility util1,
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java
index 0ef23f2..30660c6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java
@@ -17,7 +17,10 @@
*/
package org.apache.hadoop.hbase.replication;
+import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_OPERATION_TIMEOUT;
+import static org.apache.hadoop.hbase.master.ReplicationServerManager.ONLINE_SERVER_REFRESH_INTERVAL;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
@@ -38,7 +41,9 @@ import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.master.ReplicationServerManager;
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint.ReplicationServerSinkPeer;
+import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint.SinkPeer;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
@@ -74,6 +79,7 @@ public class TestReplicationServer {
private static HMaster MASTER;
private static HReplicationServer replicationServer;
+ private static ServerName replicationServerName;
private static Path baseNamespaceDir;
private static Path hfileArchiveDir;
@@ -86,14 +92,11 @@ public class TestReplicationServer {
@BeforeClass
public static void beforeClass() throws Exception {
+ CONF.setLong(HBASE_CLIENT_OPERATION_TIMEOUT, 1000);
+ CONF.setLong(ONLINE_SERVER_REFRESH_INTERVAL, 10000);
TEST_UTIL.startMiniCluster();
MASTER = TEST_UTIL.getMiniHBaseCluster().getMaster();
-
- replicationServer = new HReplicationServer(CONF);
- replicationServer.start();
-
TEST_UTIL.getMiniHBaseCluster().waitForActiveAndReadyMaster();
- TEST_UTIL.waitFor(60000, () -> replicationServer.isOnline());
Path rootDir = CommonFSUtils.getRootDir(CONF);
baseNamespaceDir = new Path(rootDir, new Path(HConstants.BASE_NAMESPACE_DIR));
@@ -108,6 +111,11 @@ public class TestReplicationServer {
@Before
public void before() throws Exception {
+ replicationServer = new HReplicationServer(CONF);
+ replicationServer.start();
+ TEST_UTIL.waitFor(60000, () -> replicationServer.isOnline());
+ replicationServerName = replicationServer.getServerName();
+
TEST_UTIL.createTable(TABLENAME, FAMILY);
TEST_UTIL.waitTableAvailable(TABLENAME);
}
@@ -115,6 +123,11 @@ public class TestReplicationServer {
@After
public void after() throws IOException {
TEST_UTIL.deleteTableIfAny(TABLENAME);
+ if (!replicationServer.isStopped()) {
+ replicationServer.stop("test");
+ }
+ replicationServer = null;
+ replicationServerName = null;
}
/**
@@ -125,10 +138,10 @@ public class TestReplicationServer {
AsyncClusterConnection conn =
TEST_UTIL.getHBaseCluster().getMaster().getAsyncClusterConnection();
AsyncReplicationServerAdmin replAdmin =
- conn.getReplicationServerAdmin(replicationServer.getServerName());
+ conn.getReplicationServerAdmin(replicationServerName);
ReplicationServerSinkPeer sinkPeer =
- new ReplicationServerSinkPeer(replicationServer.getServerName(), replAdmin);
+ new ReplicationServerSinkPeer(replicationServerName, replAdmin);
replicateWALEntryAndVerify(sinkPeer);
}
@@ -143,12 +156,11 @@ public class TestReplicationServer {
.getRegionServer().getServerName();
AsyncReplicationServerAdmin replAdmin = conn.getReplicationServerAdmin(rs);
- ReplicationServerSinkPeer
- sinkPeer = new ReplicationServerSinkPeer(rs, replAdmin);
+ ReplicationServerSinkPeer sinkPeer = new ReplicationServerSinkPeer(rs, replAdmin);
replicateWALEntryAndVerify(sinkPeer);
}
- private void replicateWALEntryAndVerify(ReplicationServerSinkPeer sinkPeer) throws Exception {
+ private void replicateWALEntryAndVerify(SinkPeer sinkPeer) throws Exception {
Entry[] entries = new Entry[BATCH_SIZE];
for(int i = 0; i < BATCH_SIZE; i++) {
entries[i] = generateEdit(i, TABLENAME, Bytes.toBytes(i));
@@ -175,4 +187,29 @@ public class TestReplicationServer {
edit.add(new KeyValue(row, Bytes.toBytes(FAMILY), Bytes.toBytes(FAMILY), timestamp, row));
return new WAL.Entry(key, edit);
}
+
+ @Test
+ public void testReplicationServerReport() throws Exception {
+ ReplicationServerManager replicationServerManager = MASTER.getReplicationServerManager();
+ assertNotNull(replicationServerManager);
+ TEST_UTIL.waitFor(60000, () -> !replicationServerManager.getOnlineServers().isEmpty()
+ && null != replicationServerManager.getServerMetrics(replicationServerName));
+ // put data via replication server
+ testReplicateWAL();
+ TEST_UTIL.waitFor(60000, () -> replicationServer.rpcServices.requestCount.sum() > 0
+ && replicationServer.rpcServices.requestCount.sum() == replicationServerManager
+ .getServerMetrics(replicationServerName).getRequestCount());
+ }
+
+ @Test
+ public void testReplicationServerExpire() throws Exception {
+ ReplicationServerManager replicationServerManager = MASTER.getReplicationServerManager();
+ TEST_UTIL.waitFor(60000, () -> !replicationServerManager.getOnlineServers().isEmpty()
+ && null != replicationServerManager.getServerMetrics(replicationServerName));
+
+ replicationServer.stop("test");
+
+ TEST_UTIL.waitFor(180000, 1000, replicationServerManager.getOnlineServers()::isEmpty);
+ assertNull(replicationServerManager.getServerMetrics(replicationServerName));
+ }
}