You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2020/09/23 00:36:13 UTC
[hbase] branch master updated: HBASE-25074 Refactor
ReplicationSinkManager: reduce code and make it easy to understand (#2430)
This is an automated email from the ASF dual-hosted git repository.
zghao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new 7e910a5 HBASE-25074 Refactor ReplicationSinkManager: reduce code and make it easy to understand (#2430)
7e910a5 is described below
commit 7e910a573f30a9995cb779fa55a6911629ac2e5f
Author: Guanghao Zhang <zg...@apache.org>
AuthorDate: Wed Sep 23 08:30:43 2020 +0800
HBASE-25074 Refactor ReplicationSinkManager: reduce code and make it easy to understand (#2430)
Signed-off-by: Wellington Chevreuil <wc...@apache.org>
Signed-off-by: Duo Zhang <zh...@apache.org>
---
.../replication/HBaseReplicationEndpoint.java | 215 +++++++++++++++------
.../HBaseInterClusterReplicationEndpoint.java | 51 +----
.../regionserver/ReplicationSinkManager.java | 193 ------------------
.../replication/TestHBaseReplicationEndpoint.java | 210 ++++++++++++++++++++
.../regionserver/TestReplicationSinkManager.java | 210 --------------------
.../TestSerialReplicationEndpoint.java | 10 +-
6 files changed, 382 insertions(+), 507 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
index 3cde0d5..850a791 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
@@ -22,8 +22,16 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
+import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
+import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.zookeeper.ZKListener;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.Abortable;
@@ -38,6 +46,9 @@ import org.apache.zookeeper.KeeperException.SessionExpiredException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
+
/**
* A {@link BaseReplicationEndpoint} for replication endpoints whose
* target cluster is an HBase cluster.
@@ -50,8 +61,58 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
private ZKWatcher zkw = null;
- private List<ServerName> regionServers = new ArrayList<>(0);
- private long lastRegionServerUpdate;
+ protected Configuration conf;
+
+ protected AsyncClusterConnection conn;
+
+ /**
+ * Default maximum number of times a replication sink can be reported as bad before
+ * it will no longer be provided as a sink for replication without the pool of
+ * replication sinks being refreshed.
+ */
+ public static final int DEFAULT_BAD_SINK_THRESHOLD = 3;
+
+ /**
+ * Default ratio of the total number of peer cluster region servers to consider
+ * replicating to.
+ */
+ public static final float DEFAULT_REPLICATION_SOURCE_RATIO = 0.5f;
+
+ // Ratio of total number of potential peer region servers to be used
+ private float ratio;
+
+ // Maximum number of times a sink can be reported as bad before the pool of
+ // replication sinks is refreshed
+ private int badSinkThreshold;
+ // Count of "bad replication sink" reports per peer sink
+ private Map<ServerName, Integer> badReportCounts;
+
+ private List<ServerName> sinkServers = new ArrayList<>(0);
+
+ /*
+ * Some implementations of HBaseInterClusterReplicationEndpoint may require instantiate different
+ * Connection implementations, or initialize it in a different way, so defining createConnection
+ * as protected for possible overridings.
+ */
+ protected AsyncClusterConnection createConnection(Configuration conf) throws IOException {
+ return ClusterConnectionFactory.createAsyncClusterConnection(conf,
+ null, User.getCurrent());
+ }
+
+ @Override
+ public void init(Context context) throws IOException {
+ super.init(context);
+ this.conf = HBaseConfiguration.create(ctx.getConfiguration());
+ // TODO: This connection is replication specific or we should make it particular to
+ // replication and make replication specific settings such as compression or codec to use
+ // passing Cells.
+ this.conn = createConnection(this.conf);
+ this.ratio =
+ ctx.getConfiguration().getFloat("replication.source.ratio", DEFAULT_REPLICATION_SOURCE_RATIO);
+ this.badSinkThreshold =
+ ctx.getConfiguration().getInt("replication.bad.sink.threshold", DEFAULT_BAD_SINK_THRESHOLD);
+ this.badReportCounts = Maps.newHashMap();
+ }
protected synchronized void disconnect() {
if (zkw != null) {
@@ -63,7 +124,7 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
* A private method used to re-establish a zookeeper session with a peer cluster.
* @param ke
*/
- protected void reconnect(KeeperException ke) {
+ private void reconnect(KeeperException ke) {
if (ke instanceof ConnectionLossException || ke instanceof SessionExpiredException
|| ke instanceof AuthFailedException) {
String clusterKey = ctx.getPeerConfig().getClusterKey();
@@ -118,22 +179,16 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
}
/**
- * Get the ZK connection to this peer
- * @return zk connection
- */
- protected synchronized ZKWatcher getZkw() {
- return zkw;
- }
-
- /**
* Closes the current ZKW (if not null) and creates a new one
* @throws IOException If anything goes wrong connecting
*/
- synchronized void reloadZkWatcher() throws IOException {
- if (zkw != null) zkw.close();
+ private synchronized void reloadZkWatcher() throws IOException {
+ if (zkw != null) {
+ zkw.close();
+ }
zkw = new ZKWatcher(ctx.getConfiguration(),
"connection to cluster: " + ctx.getPeerId(), this);
- getZkw().registerListener(new PeerRegionServerListener(this));
+ zkw.registerListener(new PeerRegionServerListener(this));
}
@Override
@@ -150,13 +205,19 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
/**
* Get the list of all the region servers from the specified peer
- * @param zkw zk connection to use
+ *
* @return list of region server addresses or an empty list if the slave is unavailable
*/
- protected static List<ServerName> fetchSlavesAddresses(ZKWatcher zkw)
- throws KeeperException {
- List<String> children = ZKUtil.listChildrenAndWatchForNewChildren(zkw,
- zkw.getZNodePaths().rsZNode);
+ protected List<ServerName> fetchSlavesAddresses() {
+ List<String> children = null;
+ try {
+ children = ZKUtil.listChildrenAndWatchForNewChildren(zkw, zkw.getZNodePaths().rsZNode);
+ } catch (KeeperException ke) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Fetch slaves addresses failed", ke);
+ }
+ reconnect(ke);
+ }
if (children == null) {
return Collections.emptyList();
}
@@ -167,43 +228,70 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
return addresses;
}
+ protected synchronized void chooseSinks() {
+ List<ServerName> slaveAddresses = fetchSlavesAddresses();
+ if (slaveAddresses.isEmpty()) {
+ LOG.warn("No sinks available at peer. Will not be able to replicate");
+ }
+ Collections.shuffle(slaveAddresses, ThreadLocalRandom.current());
+ int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio);
+ this.sinkServers = slaveAddresses.subList(0, numSinks);
+ badReportCounts.clear();
+ }
+
+ protected synchronized int getNumSinks() {
+ return sinkServers.size();
+ }
+
/**
- * Get a list of all the addresses of all the available region servers
- * for this peer cluster, or an empty list if no region servers available at peer cluster.
- * @return list of addresses
+ * Get a randomly-chosen replication sink to replicate to.
+ * @return a replication sink to replicate to
*/
- // Synchronize peer cluster connection attempts to avoid races and rate
- // limit connections when multiple replication sources try to connect to
- // the peer cluster. If the peer cluster is down we can get out of control
- // over time.
- public synchronized List<ServerName> getRegionServers() {
- try {
- setRegionServers(fetchSlavesAddresses(this.getZkw()));
- } catch (KeeperException ke) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Fetch slaves addresses failed", ke);
- }
- reconnect(ke);
+ protected synchronized SinkPeer getReplicationSink() throws IOException {
+ if (sinkServers.isEmpty()) {
+ LOG.info("Current list of sinks is out of date or empty, updating");
+ chooseSinks();
}
- return regionServers;
+ if (sinkServers.isEmpty()) {
+ throw new IOException("No replication sinks are available");
+ }
+ ServerName serverName =
+ sinkServers.get(ThreadLocalRandom.current().nextInt(sinkServers.size()));
+ return new SinkPeer(serverName, conn.getRegionServerAdmin(serverName));
}
/**
- * Set the list of region servers for that peer
- * @param regionServers list of addresses for the region servers
+ * Report a {@code SinkPeer} as being bad (i.e. an attempt to replicate to it
+ * failed). If a single SinkPeer is reported as bad more than
+ * replication.bad.sink.threshold times, it will be removed
+ * from the pool of potential replication targets.
+ *
+ * @param sinkPeer The SinkPeer that had a failed replication attempt on it
*/
- public synchronized void setRegionServers(List<ServerName> regionServers) {
- this.regionServers = regionServers;
- lastRegionServerUpdate = System.currentTimeMillis();
+ protected synchronized void reportBadSink(SinkPeer sinkPeer) {
+ ServerName serverName = sinkPeer.getServerName();
+ int badReportCount = badReportCounts.compute(serverName, (k, v) -> v == null ? 1 : v + 1);
+ if (badReportCount > badSinkThreshold) {
+ this.sinkServers.remove(serverName);
+ if (sinkServers.isEmpty()) {
+ chooseSinks();
+ }
+ }
}
/**
- * Get the timestamp at which the last change occurred to the list of region servers to replicate
- * to.
- * @return The System.currentTimeMillis at the last time the list of peer region servers changed.
+ * Report that a {@code SinkPeer} successfully replicated a chunk of data.
+ *
+ * @param sinkPeer
+ * The SinkPeer that had a failed replication attempt on it
*/
- public long getLastRegionServerUpdate() {
- return lastRegionServerUpdate;
+ protected synchronized void reportSinkSuccess(SinkPeer sinkPeer) {
+ badReportCounts.remove(sinkPeer.getServerName());
+ }
+
+ @VisibleForTesting
+ List<ServerName> getSinkServers() {
+ return sinkServers;
}
/**
@@ -214,22 +302,39 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
private final HBaseReplicationEndpoint replicationEndpoint;
private final String regionServerListNode;
- public PeerRegionServerListener(HBaseReplicationEndpoint replicationPeer) {
- super(replicationPeer.getZkw());
- this.replicationEndpoint = replicationPeer;
- this.regionServerListNode = replicationEndpoint.getZkw().getZNodePaths().rsZNode;
+ public PeerRegionServerListener(HBaseReplicationEndpoint endpoint) {
+ super(endpoint.zkw);
+ this.replicationEndpoint = endpoint;
+ this.regionServerListNode = endpoint.zkw.getZNodePaths().rsZNode;
}
@Override
public synchronized void nodeChildrenChanged(String path) {
if (path.equals(regionServerListNode)) {
- try {
- LOG.info("Detected change to peer region servers, fetching updated list");
- replicationEndpoint.setRegionServers(fetchSlavesAddresses(replicationEndpoint.getZkw()));
- } catch (KeeperException e) {
- LOG.error("Error reading slave addresses", e);
- }
+ LOG.info("Detected change to peer region servers, fetching updated list");
+ replicationEndpoint.chooseSinks();
}
}
}
+
+ /**
+ * Wraps a replication region server sink to provide the ability to identify it.
+ */
+ public static class SinkPeer {
+ private ServerName serverName;
+ private AsyncRegionServerAdmin regionServer;
+
+ public SinkPeer(ServerName serverName, AsyncRegionServerAdmin regionServer) {
+ this.serverName = serverName;
+ this.regionServer = regionServer;
+ }
+
+ ServerName getServerName() {
+ return serverName;
+ }
+
+ public AsyncRegionServerAdmin getRegionServer() {
+ return regionServer;
+ }
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
index 4e0669c..b6e1f69 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -41,7 +41,6 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.CellUtil;
@@ -60,7 +59,6 @@ import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil;
import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
-import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
@@ -100,8 +98,6 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
public static final String REPLICATION_DROP_ON_DELETED_COLUMN_FAMILY_KEY =
"hbase.replication.drop.on.deleted.columnfamily";
- private AsyncClusterConnection conn;
- private Configuration conf;
// How long should we sleep for each retry
private long sleepForRetries;
// Maximum number of retries before taking bold actions
@@ -114,8 +110,6 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
private int replicationRpcLimit;
//Metrics for this source
private MetricsSource metrics;
- // Handles connecting to peer region servers
- private ReplicationSinkManager replicationSinkMgr;
private boolean peersSelected = false;
private String replicationClusterId = "";
private ThreadPoolExecutor exec;
@@ -130,25 +124,6 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
//Initialising as 0 to guarantee at least one logging message
private long lastSinkFetchTime = 0;
- /*
- * Some implementations of HBaseInterClusterReplicationEndpoint may require instantiate different
- * Connection implementations, or initialize it in a different way, so defining createConnection
- * as protected for possible overridings.
- */
- protected AsyncClusterConnection createConnection(Configuration conf) throws IOException {
- return ClusterConnectionFactory.createAsyncClusterConnection(conf,
- null, User.getCurrent());
- }
-
- /*
- * Some implementations of HBaseInterClusterReplicationEndpoint may require instantiate different
- * ReplicationSinkManager implementations, or initialize it in a different way,
- * so defining createReplicationSinkManager as protected for possible overridings.
- */
- protected ReplicationSinkManager createReplicationSinkManager(AsyncClusterConnection conn) {
- return new ReplicationSinkManager(conn, this, this.conf);
- }
-
@Override
public void init(Context context) throws IOException {
super.init(context);
@@ -171,8 +146,6 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
this.sleepForRetries =
this.conf.getLong("replication.source.sleepforretries", 1000);
this.metrics = context.getMetrics();
- // ReplicationQueueInfo parses the peerId out of the znode for us
- this.replicationSinkMgr = createReplicationSinkManager(conn);
// per sink thread pool
this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY,
HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT);
@@ -211,14 +184,11 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
}
private void connectToPeers() {
- getRegionServers();
-
int sleepMultiplier = 1;
-
// Connect to peer cluster first, unless we have to stop
- while (this.isRunning() && replicationSinkMgr.getNumSinks() == 0) {
- replicationSinkMgr.chooseSinks();
- if (this.isRunning() && replicationSinkMgr.getNumSinks() == 0) {
+ while (this.isRunning() && getNumSinks() == 0) {
+ chooseSinks();
+ if (this.isRunning() && getNumSinks() == 0) {
if (sleepForRetries("Waiting for peers", sleepMultiplier)) {
sleepMultiplier++;
}
@@ -253,7 +223,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
}
private List<List<Entry>> createParallelBatches(final List<Entry> entries) {
- int numSinks = Math.max(replicationSinkMgr.getNumSinks(), 1);
+ int numSinks = Math.max(getNumSinks(), 1);
int n = Math.min(Math.min(this.maxThreads, entries.size() / 100 + 1), numSinks);
List<List<Entry>> entryLists =
Stream.generate(ArrayList<Entry>::new).limit(n).collect(Collectors.toList());
@@ -513,7 +483,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
peersSelected = true;
}
- int numSinks = replicationSinkMgr.getNumSinks();
+ int numSinks = getNumSinks();
if (numSinks == 0) {
if((System.currentTimeMillis() - lastSinkFetchTime) >= (maxRetriesMultiplier*1000)) {
LOG.warn(
@@ -561,7 +531,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
} else {
LOG.warn("{} Peer encountered RemoteException, rechecking all sinks: ", logPeerId(),
ioe);
- replicationSinkMgr.chooseSinks();
+ chooseSinks();
}
} else {
if (ioe instanceof SocketTimeoutException) {
@@ -574,7 +544,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
this.socketTimeoutMultiplier);
} else if (ioe instanceof ConnectException || ioe instanceof UnknownHostException) {
LOG.warn("{} Peer is unavailable, rechecking all sinks: ", logPeerId(), ioe);
- replicationSinkMgr.chooseSinks();
+ chooseSinks();
} else {
LOG.warn("{} Can't replicate because of a local or network error: ", logPeerId(), ioe);
}
@@ -629,7 +599,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
LOG.trace("{} Replicating batch {} of {} entries with total size {} bytes to {}",
logPeerId(), entriesHashCode, entries.size(), size, replicationClusterId);
}
- sinkPeer = replicationSinkMgr.getReplicationSink();
+ sinkPeer = getReplicationSink();
AsyncRegionServerAdmin rsAdmin = sinkPeer.getRegionServer();
try {
ReplicationProtobufUtil.replicateWALEntry(rsAdmin,
@@ -644,10 +614,10 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
}
throw e;
}
- replicationSinkMgr.reportSinkSuccess(sinkPeer);
+ reportSinkSuccess(sinkPeer);
} catch (IOException ioe) {
if (sinkPeer != null) {
- replicationSinkMgr.reportBadSink(sinkPeer);
+ reportBadSink(sinkPeer);
}
throw ioe;
}
@@ -683,5 +653,4 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
private String logPeerId(){
return "[Source for peer " + this.ctx.getPeerId() + "]:";
}
-
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java
deleted file mode 100644
index db12dc0..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication.regionserver;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ThreadLocalRandom;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.client.AsyncClusterConnection;
-import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
-import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
-import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
-
-/**
- * Maintains a collection of peers to replicate to, and randomly selects a
- * single peer to replicate to per set of data to replicate. Also handles
- * keeping track of peer availability.
- */
-@InterfaceAudience.Private
-public class ReplicationSinkManager {
-
- private static final Logger LOG = LoggerFactory.getLogger(ReplicationSinkManager.class);
-
- /**
- * Default maximum number of times a replication sink can be reported as bad before
- * it will no longer be provided as a sink for replication without the pool of
- * replication sinks being refreshed.
- */
- static final int DEFAULT_BAD_SINK_THRESHOLD = 3;
-
- /**
- * Default ratio of the total number of peer cluster region servers to consider
- * replicating to.
- */
- static final float DEFAULT_REPLICATION_SOURCE_RATIO = 0.5f;
-
-
- private final AsyncClusterConnection conn;
-
- private final HBaseReplicationEndpoint endpoint;
-
- // Count of "bad replication sink" reports per peer sink
- private final Map<ServerName, Integer> badReportCounts;
-
- // Ratio of total number of potential peer region servers to be used
- private final float ratio;
-
- // Maximum number of times a sink can be reported as bad before the pool of
- // replication sinks is refreshed
- private final int badSinkThreshold;
-
- // A timestamp of the last time the list of replication peers changed
- private long lastUpdateToPeers;
-
- // The current pool of sinks to which replication can be performed
- private List<ServerName> sinks = Lists.newArrayList();
-
- /**
- * Instantiate for a single replication peer cluster.
- * @param conn connection to the peer cluster
- * @param endpoint replication endpoint for inter cluster replication
- * @param conf HBase configuration, used for determining replication source ratio and bad peer
- * threshold
- */
- public ReplicationSinkManager(AsyncClusterConnection conn, HBaseReplicationEndpoint endpoint,
- Configuration conf) {
- this.conn = conn;
- this.endpoint = endpoint;
- this.badReportCounts = Maps.newHashMap();
- this.ratio = conf.getFloat("replication.source.ratio", DEFAULT_REPLICATION_SOURCE_RATIO);
- this.badSinkThreshold =
- conf.getInt("replication.bad.sink.threshold", DEFAULT_BAD_SINK_THRESHOLD);
- }
-
- /**
- * Get a randomly-chosen replication sink to replicate to.
- * @return a replication sink to replicate to
- */
- public synchronized SinkPeer getReplicationSink() throws IOException {
- if (endpoint.getLastRegionServerUpdate() > this.lastUpdateToPeers || sinks.isEmpty()) {
- LOG.info("Current list of sinks is out of date or empty, updating");
- chooseSinks();
- }
-
- if (sinks.isEmpty()) {
- throw new IOException("No replication sinks are available");
- }
- ServerName serverName = sinks.get(ThreadLocalRandom.current().nextInt(sinks.size()));
- return new SinkPeer(serverName, conn.getRegionServerAdmin(serverName));
- }
-
- /**
- * Report a {@code SinkPeer} as being bad (i.e. an attempt to replicate to it
- * failed). If a single SinkPeer is reported as bad more than
- * replication.bad.sink.threshold times, it will be removed
- * from the pool of potential replication targets.
- *
- * @param sinkPeer
- * The SinkPeer that had a failed replication attempt on it
- */
- public synchronized void reportBadSink(SinkPeer sinkPeer) {
- ServerName serverName = sinkPeer.getServerName();
- int badReportCount = (badReportCounts.containsKey(serverName)
- ? badReportCounts.get(serverName) : 0) + 1;
- badReportCounts.put(serverName, badReportCount);
- if (badReportCount > badSinkThreshold) {
- this.sinks.remove(serverName);
- if (sinks.isEmpty()) {
- chooseSinks();
- }
- }
- }
-
- /**
- * Report that a {@code SinkPeer} successfully replicated a chunk of data.
- *
- * @param sinkPeer
- * The SinkPeer that had a failed replication attempt on it
- */
- public synchronized void reportSinkSuccess(SinkPeer sinkPeer) {
- badReportCounts.remove(sinkPeer.getServerName());
- }
-
- /**
- * Refresh the list of sinks.
- */
- public synchronized void chooseSinks() {
- List<ServerName> slaveAddresses = endpoint.getRegionServers();
- if(slaveAddresses.isEmpty()){
- LOG.warn("No sinks available at peer. Will not be able to replicate");
- }
- Collections.shuffle(slaveAddresses, ThreadLocalRandom.current());
- int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio);
- sinks = slaveAddresses.subList(0, numSinks);
- lastUpdateToPeers = System.currentTimeMillis();
- badReportCounts.clear();
- }
-
- public synchronized int getNumSinks() {
- return sinks.size();
- }
-
- @VisibleForTesting
- protected List<ServerName> getSinksForTesting() {
- return Collections.unmodifiableList(sinks);
- }
-
- /**
- * Wraps a replication region server sink to provide the ability to identify
- * it.
- */
- public static class SinkPeer {
- private ServerName serverName;
- private AsyncRegionServerAdmin regionServer;
-
- public SinkPeer(ServerName serverName, AsyncRegionServerAdmin regionServer) {
- this.serverName = serverName;
- this.regionServer = regionServer;
- }
-
- ServerName getServerName() {
- return serverName;
- }
-
- public AsyncRegionServerAdmin getRegionServer() {
- return regionServer;
- }
- }
-}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java
new file mode 100644
index 0000000..4160141
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
+import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint.SinkPeer;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+
+@Category({ReplicationTests.class, SmallTests.class})
+public class TestHBaseReplicationEndpoint {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestHBaseReplicationEndpoint.class);
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestHBaseReplicationEndpoint.class);
+
+ private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+ private HBaseReplicationEndpoint endpoint;
+
+ @Before
+ public void setUp() throws Exception {
+ try {
+ ReplicationEndpoint.Context context =
+ new ReplicationEndpoint.Context(null, UTIL.getConfiguration(), UTIL.getConfiguration(),
+ null, null, null, null, null, null, null);
+ endpoint = new DummyHBaseReplicationEndpoint();
+ endpoint.init(context);
+ } catch (Exception e) {
+ LOG.info("Failed", e);
+ }
+ }
+
+ @Test
+ public void testChooseSinks() {
+ List<ServerName> serverNames = Lists.newArrayList();
+ int totalServers = 20;
+ for (int i = 0; i < totalServers; i++) {
+ serverNames.add(mock(ServerName.class));
+ }
+ ((DummyHBaseReplicationEndpoint) endpoint).setRegionServers(serverNames);
+ endpoint.chooseSinks();
+ int expected = (int) (totalServers * HBaseReplicationEndpoint.DEFAULT_REPLICATION_SOURCE_RATIO);
+ assertEquals(expected, endpoint.getNumSinks());
+ }
+
+ @Test
+ public void testChooseSinksLessThanRatioAvailable() {
+ List<ServerName> serverNames = Lists.newArrayList(mock(ServerName.class),
+ mock(ServerName.class));
+ ((DummyHBaseReplicationEndpoint) endpoint).setRegionServers(serverNames);
+ endpoint.chooseSinks();
+ assertEquals(1, endpoint.getNumSinks());
+ }
+
+ @Test
+ public void testReportBadSink() {
+ ServerName serverNameA = mock(ServerName.class);
+ ServerName serverNameB = mock(ServerName.class);
+ ((DummyHBaseReplicationEndpoint) endpoint)
+ .setRegionServers(Lists.newArrayList(serverNameA, serverNameB));
+ endpoint.chooseSinks();
+ // Sanity check
+ assertEquals(1, endpoint.getNumSinks());
+
+ SinkPeer sinkPeer = new SinkPeer(serverNameA, mock(AsyncRegionServerAdmin.class));
+ endpoint.reportBadSink(sinkPeer);
+ // Just reporting a bad sink once shouldn't have an effect
+ assertEquals(1, endpoint.getNumSinks());
+ }
+
+ /**
+ * Once a SinkPeer has been reported as bad more than BAD_SINK_THRESHOLD times, it should not
+ * be replicated to anymore.
+ */
+ @Test
+ public void testReportBadSinkPastThreshold() {
+ List<ServerName> serverNames = Lists.newArrayList();
+ int totalServers = 30;
+ for (int i = 0; i < totalServers; i++) {
+ serverNames.add(mock(ServerName.class));
+ }
+ ((DummyHBaseReplicationEndpoint) endpoint).setRegionServers(serverNames);
+ endpoint.chooseSinks();
+ // Sanity check
+ int expected = (int) (totalServers * HBaseReplicationEndpoint.DEFAULT_REPLICATION_SOURCE_RATIO);
+ assertEquals(expected, endpoint.getNumSinks());
+
+ ServerName badSinkServer0 = endpoint.getSinkServers().get(0);
+ SinkPeer sinkPeer = new SinkPeer(badSinkServer0, mock(AsyncRegionServerAdmin.class));
+ for (int i = 0; i <= HBaseReplicationEndpoint.DEFAULT_BAD_SINK_THRESHOLD; i++) {
+ endpoint.reportBadSink(sinkPeer);
+ }
+ // Reporting a bad sink more than the threshold count should remove it
+ // from the list of potential sinks
+ assertEquals(expected - 1, endpoint.getNumSinks());
+
+ // now try a sink that has some successes
+ ServerName badSinkServer1 = endpoint.getSinkServers().get(0);
+ sinkPeer = new SinkPeer(badSinkServer1, mock(AsyncRegionServerAdmin.class));
+ for (int i = 0; i < HBaseReplicationEndpoint.DEFAULT_BAD_SINK_THRESHOLD; i++) {
+ endpoint.reportBadSink(sinkPeer);
+ }
+ endpoint.reportSinkSuccess(sinkPeer); // one success
+ endpoint.reportBadSink(sinkPeer);
+ // did not remove the sink, since we had one successful try
+ assertEquals(expected - 1, endpoint.getNumSinks());
+
+ for (int i = 0; i < HBaseReplicationEndpoint.DEFAULT_BAD_SINK_THRESHOLD - 1; i++) {
+ endpoint.reportBadSink(sinkPeer);
+ }
+ // still not remove, since the success reset the counter
+ assertEquals(expected - 1, endpoint.getNumSinks());
+ endpoint.reportBadSink(sinkPeer);
+ // but we exhausted the tries
+ assertEquals(expected - 2, endpoint.getNumSinks());
+ }
+
+ @Test
+ public void testReportBadSinkDownToZeroSinks() {
+ List<ServerName> serverNames = Lists.newArrayList();
+ int totalServers = 4;
+ for (int i = 0; i < totalServers; i++) {
+ serverNames.add(mock(ServerName.class));
+ }
+ ((DummyHBaseReplicationEndpoint) endpoint).setRegionServers(serverNames);
+ endpoint.chooseSinks();
+ // Sanity check
+ int expected = (int) (totalServers * HBaseReplicationEndpoint.DEFAULT_REPLICATION_SOURCE_RATIO);
+ assertEquals(expected, endpoint.getNumSinks());
+
+ ServerName serverNameA = endpoint.getSinkServers().get(0);
+ ServerName serverNameB = endpoint.getSinkServers().get(1);
+
+ SinkPeer sinkPeerA = new SinkPeer(serverNameA, mock(AsyncRegionServerAdmin.class));
+ SinkPeer sinkPeerB = new SinkPeer(serverNameB, mock(AsyncRegionServerAdmin.class));
+
+ for (int i = 0; i <= HBaseReplicationEndpoint.DEFAULT_BAD_SINK_THRESHOLD; i++) {
+ endpoint.reportBadSink(sinkPeerA);
+ endpoint.reportBadSink(sinkPeerB);
+ }
+
+ // We've gone down to 0 good sinks, so the replication sinks
+ // should have been refreshed now, so out of 4 servers, 2 are not considered as they are
+ // reported as bad.
+ expected =
+ (int) ((totalServers - 2) * HBaseReplicationEndpoint.DEFAULT_REPLICATION_SOURCE_RATIO);
+ assertEquals(expected, endpoint.getNumSinks());
+ }
+
+ private static class DummyHBaseReplicationEndpoint extends HBaseReplicationEndpoint {
+
+ List<ServerName> regionServers;
+
+ public void setRegionServers(List<ServerName> regionServers) {
+ this.regionServers = regionServers;
+ }
+
+ @Override
+ public List<ServerName> fetchSlavesAddresses() {
+ return regionServers;
+ }
+
+ @Override
+ public boolean replicate(ReplicateContext replicateContext) {
+ return false;
+ }
+
+ @Override
+ public AsyncClusterConnection createConnection(Configuration conf) throws IOException {
+ return null;
+ }
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java
deleted file mode 100644
index f8a2ab9..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication.regionserver;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
-
-import java.util.List;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.client.AsyncClusterConnection;
-import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
-import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
-import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
-import org.apache.hadoop.hbase.testclassification.ReplicationTests;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
-
-@Category({ReplicationTests.class, SmallTests.class})
-public class TestReplicationSinkManager {
-
- @ClassRule
- public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestReplicationSinkManager.class);
-
- private ReplicationSinkManager sinkManager;
- private HBaseReplicationEndpoint replicationEndpoint;
-
- /**
- * Manage the 'getRegionServers' for the tests below. Override the base class handling
- * of Regionservers. We used to use a mock for this but updated guava/errorprone disallows
- * mocking of classes that implement Service.
- */
- private static class SetServersHBaseReplicationEndpoint extends HBaseReplicationEndpoint {
- List<ServerName> regionServers;
-
- @Override
- public boolean replicate(ReplicateContext replicateContext) {
- return false;
- }
-
- @Override
- public synchronized void setRegionServers(List<ServerName> regionServers) {
- this.regionServers = regionServers;
- }
-
- @Override
- public List<ServerName> getRegionServers() {
- return this.regionServers;
- }
- }
-
- @Before
- public void setUp() {
- this.replicationEndpoint = new SetServersHBaseReplicationEndpoint();
- this.sinkManager = new ReplicationSinkManager(mock(AsyncClusterConnection.class),
- replicationEndpoint, new Configuration());
- }
-
- @Test
- public void testChooseSinks() {
- List<ServerName> serverNames = Lists.newArrayList();
- int totalServers = 20;
- for (int i = 0; i < totalServers; i++) {
- serverNames.add(mock(ServerName.class));
- }
- replicationEndpoint.setRegionServers(serverNames);
- sinkManager.chooseSinks();
- int expected = (int) (totalServers * ReplicationSinkManager.DEFAULT_REPLICATION_SOURCE_RATIO);
- assertEquals(expected, sinkManager.getNumSinks());
-
- }
-
- @Test
- public void testChooseSinks_LessThanRatioAvailable() {
- List<ServerName> serverNames = Lists.newArrayList(mock(ServerName.class),
- mock(ServerName.class));
- replicationEndpoint.setRegionServers(serverNames);
- sinkManager.chooseSinks();
- assertEquals(1, sinkManager.getNumSinks());
- }
-
- @Test
- public void testReportBadSink() {
- ServerName serverNameA = mock(ServerName.class);
- ServerName serverNameB = mock(ServerName.class);
- replicationEndpoint.setRegionServers(Lists.newArrayList(serverNameA, serverNameB));
- sinkManager.chooseSinks();
- // Sanity check
- assertEquals(1, sinkManager.getNumSinks());
-
- SinkPeer sinkPeer = new SinkPeer(serverNameA, mock(AsyncRegionServerAdmin.class));
-
- sinkManager.reportBadSink(sinkPeer);
-
- // Just reporting a bad sink once shouldn't have an effect
- assertEquals(1, sinkManager.getNumSinks());
-
- }
-
- /**
- * Once a SinkPeer has been reported as bad more than BAD_SINK_THRESHOLD times, it should not
- * be replicated to anymore.
- */
- @Test
- public void testReportBadSink_PastThreshold() {
- List<ServerName> serverNames = Lists.newArrayList();
- int totalServers = 30;
- for (int i = 0; i < totalServers; i++) {
- serverNames.add(mock(ServerName.class));
- }
- replicationEndpoint.setRegionServers(serverNames);
- sinkManager.chooseSinks();
- // Sanity check
- int expected = (int) (totalServers * ReplicationSinkManager.DEFAULT_REPLICATION_SOURCE_RATIO);
- assertEquals(expected, sinkManager.getNumSinks());
-
- ServerName serverName = sinkManager.getSinksForTesting().get(0);
-
- SinkPeer sinkPeer = new SinkPeer(serverName, mock(AsyncRegionServerAdmin.class));
-
- sinkManager.reportSinkSuccess(sinkPeer); // has no effect, counter does not go negative
- for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD; i++) {
- sinkManager.reportBadSink(sinkPeer);
- }
-
- // Reporting a bad sink more than the threshold count should remove it
- // from the list of potential sinks
- assertEquals(expected - 1, sinkManager.getNumSinks());
-
- //
- // now try a sink that has some successes
- //
- serverName = sinkManager.getSinksForTesting().get(0);
-
- sinkPeer = new SinkPeer(serverName, mock(AsyncRegionServerAdmin.class));
- for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD-1; i++) {
- sinkManager.reportBadSink(sinkPeer);
- }
- sinkManager.reportSinkSuccess(sinkPeer); // one success
- sinkManager.reportBadSink(sinkPeer);
-
- // did not remove the sink, since we had one successful try
- assertEquals(expected - 1, sinkManager.getNumSinks());
-
- for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD-2; i++) {
- sinkManager.reportBadSink(sinkPeer);
- }
- // still not remove, since the success reset the counter
- assertEquals(expected - 1, sinkManager.getNumSinks());
-
- sinkManager.reportBadSink(sinkPeer);
- // but we exhausted the tries
- assertEquals(expected - 2, sinkManager.getNumSinks());
- }
-
- @Test
- public void testReportBadSink_DownToZeroSinks() {
- List<ServerName> serverNames = Lists.newArrayList();
- int totalServers = 4;
- for (int i = 0; i < totalServers; i++) {
- serverNames.add(mock(ServerName.class));
- }
- replicationEndpoint.setRegionServers(serverNames);
- sinkManager.chooseSinks();
- // Sanity check
- List<ServerName> sinkList = sinkManager.getSinksForTesting();
- int expected = (int) (totalServers * ReplicationSinkManager.DEFAULT_REPLICATION_SOURCE_RATIO);
- assertEquals(expected, sinkList.size());
-
- ServerName serverNameA = sinkList.get(0);
- ServerName serverNameB = sinkList.get(1);
-
- SinkPeer sinkPeerA = new SinkPeer(serverNameA, mock(AsyncRegionServerAdmin.class));
- SinkPeer sinkPeerB = new SinkPeer(serverNameB, mock(AsyncRegionServerAdmin.class));
-
- for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD; i++) {
- sinkManager.reportBadSink(sinkPeerA);
- sinkManager.reportBadSink(sinkPeerB);
- }
-
- // We've gone down to 0 good sinks, so the replication sinks
- // should have been refreshed now, so out of 4 servers, 2 are not considered as they are
- // reported as bad.
- expected = (int) ((totalServers - 2) * ReplicationSinkManager.DEFAULT_REPLICATION_SOURCE_RATIO);
- assertEquals(expected, sinkManager.getNumSinks());
- }
-
-}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java
index 3c88ab3..0901291 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java
@@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Admin;
@@ -175,14 +174,9 @@ public class TestSerialReplicationEndpoint {
}
@Override
- public synchronized List<ServerName> getRegionServers() {
+ public synchronized int getNumSinks() {
// Return multiple server names for endpoint parallel replication.
- return new ArrayList<>(
- ImmutableList.of(ServerName.valueOf("www.example.com", 12016, 1525245876026L),
- ServerName.valueOf("www.example2.com", 12016, 1525245876026L),
- ServerName.valueOf("www.example3.com", 12016, 1525245876026L),
- ServerName.valueOf("www.example4.com", 12016, 1525245876026L),
- ServerName.valueOf("www.example4.com", 12016, 1525245876026L)));
+ return 10;
}
}
}