You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ec...@apache.org on 2015/12/18 20:51:36 UTC
hbase git commit: HBASE-15001 Fix thread-safety issues with
replication
Repository: hbase
Updated Branches:
refs/heads/branch-1.2 6330127e1 -> f656701f1
HBASE-15001 Fix thread-safety issues with replication
ReplicationSinkManager and HBaseInterClusterReplicationEndpoint
perform certain unsafe operations which might lead to undesirable
behavior with multiwal enabled.
Signed-off-by: Elliott Clark <ec...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f656701f
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f656701f
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f656701f
Branch: refs/heads/branch-1.2
Commit: f656701f156a28215ce690c566f8e529f4e3a55c
Parents: 6330127
Author: Ashu Pachauri <as...@gmail.com>
Authored: Thu Dec 17 13:25:39 2015 -0800
Committer: Elliott Clark <ec...@apache.org>
Committed: Fri Dec 18 11:49:47 2015 -0800
----------------------------------------------------------------------
.../HBaseInterClusterReplicationEndpoint.java | 27 +++++++++++++++-----
.../regionserver/ReplicationSinkManager.java | 21 ++++++++++-----
.../TestReplicationSinkManager.java | 26 +++++++++----------
3 files changed, 49 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/f656701f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
index 8f77065..9300d4b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -126,9 +126,9 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
int sleepMultiplier = 1;
// Connect to peer cluster first, unless we have to stop
- while (this.isRunning() && replicationSinkMgr.getSinks().size() == 0) {
+ while (this.isRunning() && replicationSinkMgr.getNumSinks() == 0) {
replicationSinkMgr.chooseSinks();
- if (this.isRunning() && replicationSinkMgr.getSinks().size() == 0) {
+ if (this.isRunning() && replicationSinkMgr.getNumSinks() == 0) {
if (sleepForRetries("Waiting for peers", sleepMultiplier)) {
sleepMultiplier++;
}
@@ -163,19 +163,24 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
List<Entry> entries = replicateContext.getEntries();
String walGroupId = replicateContext.getWalGroupId();
int sleepMultiplier = 1;
+ int numReplicated = 0;
if (!peersSelected && this.isRunning()) {
connectToPeers();
peersSelected = true;
}
- if (replicationSinkMgr.getSinks().size() == 0) {
+ int numSinks = replicationSinkMgr.getNumSinks();
+ if (numSinks == 0) {
+ LOG.warn("No replication sinks found, returning without replicating. The source should retry"
+ + " with the same set of edits.");
return false;
}
+
// minimum of: configured threads, number of 100-waledit batches,
// and number of current sinks
- int n = Math.min(Math.min(this.maxThreads, entries.size()/100+1),
- replicationSinkMgr.getSinks().size());
+ int n = Math.min(Math.min(this.maxThreads, entries.size()/100+1), numSinks);
+
List<List<Entry>> entryLists = new ArrayList<List<Entry>>(n);
if (n == 1) {
entryLists.add(entries);
@@ -220,7 +225,11 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
// wait for all futures, remove successful parts
// (only the remaining parts will be retried)
Future<Integer> f = pool.take();
- entryLists.set(f.get().intValue(), Collections.<Entry>emptyList());
+ int index = f.get().intValue();
+ int batchSize = entryLists.get(index).size();
+ entryLists.set(index, Collections.<Entry>emptyList());
+ // Now, we have marked the batch as done replicating, record its size
+ numReplicated += batchSize;
} catch (InterruptedException ie) {
iox = new IOException(ie);
} catch (ExecutionException ee) {
@@ -232,6 +241,12 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
// if we had any exceptions, try again
throw iox;
}
+ if (numReplicated != entries.size()) {
+ // Something went wrong here and we don't know what, let's just fail and retry.
+ LOG.warn("The number of edits replicated is different from the number received,"
+ + " failing for now.");
+ return false;
+ }
// update metrics
this.metrics.setAgeOfLastShippedOp(entries.get(entries.size() - 1).getKey().getWriteTime(),
walGroupId);
http://git-wip-us.apache.org/repos/asf/hbase/blob/f656701f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java
index 76fa6c2..0469f9b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -105,7 +106,7 @@ public class ReplicationSinkManager {
*
* @return a replication sink to replicate to
*/
- public SinkPeer getReplicationSink() throws IOException {
+ public synchronized SinkPeer getReplicationSink() throws IOException {
if (endpoint.getLastRegionServerUpdate() > this.lastUpdateToPeers || sinks.isEmpty()) {
LOG.info("Current list of sinks is out of date or empty, updating");
chooseSinks();
@@ -127,7 +128,7 @@ public class ReplicationSinkManager {
* @param sinkPeer
* The SinkPeer that had a failed replication attempt on it
*/
- public void reportBadSink(SinkPeer sinkPeer) {
+ public synchronized void reportBadSink(SinkPeer sinkPeer) {
ServerName serverName = sinkPeer.getServerName();
int badReportCount = (badReportCounts.containsKey(serverName)
? badReportCounts.get(serverName) : 0) + 1;
@@ -146,11 +147,14 @@ public class ReplicationSinkManager {
* @param sinkPeer
* The SinkPeer that had a failed replication attempt on it
*/
- public void reportSinkSuccess(SinkPeer sinkPeer) {
+ public synchronized void reportSinkSuccess(SinkPeer sinkPeer) {
badReportCounts.remove(sinkPeer.getServerName());
}
- void chooseSinks() {
+ /**
+ * Refresh the list of sinks.
+ */
+ public synchronized void chooseSinks() {
List<ServerName> slaveAddresses = endpoint.getRegionServers();
Collections.shuffle(slaveAddresses, random);
int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio);
@@ -159,8 +163,13 @@ public class ReplicationSinkManager {
badReportCounts.clear();
}
- List<ServerName> getSinks() {
- return sinks;
+ public synchronized int getNumSinks() {
+ return sinks.size();
+ }
+
+ @VisibleForTesting
+ protected List<ServerName> getSinksForTesting() {
+ return Collections.unmodifiableList(sinks);
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/f656701f/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java
index 4eb7f51..c0b7d0c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java
@@ -66,7 +66,7 @@ public class TestReplicationSinkManager {
sinkManager.chooseSinks();
- assertEquals(2, sinkManager.getSinks().size());
+ assertEquals(2, sinkManager.getNumSinks());
}
@@ -80,7 +80,7 @@ public class TestReplicationSinkManager {
sinkManager.chooseSinks();
- assertEquals(1, sinkManager.getSinks().size());
+ assertEquals(1, sinkManager.getNumSinks());
}
@Test
@@ -92,14 +92,14 @@ public class TestReplicationSinkManager {
sinkManager.chooseSinks();
// Sanity check
- assertEquals(1, sinkManager.getSinks().size());
+ assertEquals(1, sinkManager.getNumSinks());
SinkPeer sinkPeer = new SinkPeer(serverNameA, mock(AdminService.BlockingInterface.class));
sinkManager.reportBadSink(sinkPeer);
// Just reporting a bad sink once shouldn't have an effect
- assertEquals(1, sinkManager.getSinks().size());
+ assertEquals(1, sinkManager.getNumSinks());
}
@@ -119,9 +119,9 @@ public class TestReplicationSinkManager {
sinkManager.chooseSinks();
// Sanity check
- assertEquals(3, sinkManager.getSinks().size());
+ assertEquals(3, sinkManager.getNumSinks());
- ServerName serverName = sinkManager.getSinks().get(0);
+ ServerName serverName = sinkManager.getSinksForTesting().get(0);
SinkPeer sinkPeer = new SinkPeer(serverName, mock(AdminService.BlockingInterface.class));
@@ -132,12 +132,12 @@ public class TestReplicationSinkManager {
// Reporting a bad sink more than the threshold count should remove it
// from the list of potential sinks
- assertEquals(2, sinkManager.getSinks().size());
+ assertEquals(2, sinkManager.getNumSinks());
//
// now try a sink that has some successes
//
- serverName = sinkManager.getSinks().get(0);
+ serverName = sinkManager.getSinksForTesting().get(0);
sinkPeer = new SinkPeer(serverName, mock(AdminService.BlockingInterface.class));
for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD-1; i++) {
@@ -147,17 +147,17 @@ public class TestReplicationSinkManager {
sinkManager.reportBadSink(sinkPeer);
// did not remove the sink, since we had one successful try
- assertEquals(2, sinkManager.getSinks().size());
+ assertEquals(2, sinkManager.getNumSinks());
for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD-2; i++) {
sinkManager.reportBadSink(sinkPeer);
}
// still not remove, since the success reset the counter
- assertEquals(2, sinkManager.getSinks().size());
+ assertEquals(2, sinkManager.getNumSinks());
sinkManager.reportBadSink(sinkPeer);
// but we exhausted the tries
- assertEquals(1, sinkManager.getSinks().size());
+ assertEquals(1, sinkManager.getNumSinks());
}
@Test
@@ -173,7 +173,7 @@ public class TestReplicationSinkManager {
sinkManager.chooseSinks();
// Sanity check
- List<ServerName> sinkList = sinkManager.getSinks();
+ List<ServerName> sinkList = sinkManager.getSinksForTesting();
assertEquals(2, sinkList.size());
ServerName serverNameA = sinkList.get(0);
@@ -189,7 +189,7 @@ public class TestReplicationSinkManager {
// We've gone down to 0 good sinks, so the replication sinks
// should have been refreshed now
- assertEquals(2, sinkManager.getSinks().size());
+ assertEquals(2, sinkManager.getNumSinks());
}
}