You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2017/06/10 03:03:25 UTC
hbase git commit: HBASE-18192: Replication drops recovered queues on
region server shutdown (Ashu Pachauri)
Repository: hbase
Updated Branches:
refs/heads/branch-1.2 a2617b00c -> 96e48c3df
HBASE-18192: Replication drops recovered queues on region server shutdown (Ashu Pachauri)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/96e48c3d
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/96e48c3d
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/96e48c3d
Branch: refs/heads/branch-1.2
Commit: 96e48c3df597fc1450546818e2bd34cfc1fd5c10
Parents: a2617b0
Author: tedyu <yu...@gmail.com>
Authored: Fri Jun 9 20:03:20 2017 -0700
Committer: tedyu <yu...@gmail.com>
Committed: Fri Jun 9 20:03:20 2017 -0700
----------------------------------------------------------------------
.../hbase/regionserver/HRegionServer.java | 3 +-
.../regionserver/ReplicationSource.java | 42 ++++--
.../replication/TestReplicationSource.java | 128 ++++++++++++++++++-
3 files changed, 161 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/96e48c3d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 129b5a7..4ae0286 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -2213,7 +2213,8 @@ public class HRegionServer extends HasThread implements
* @return Return the object that implements the replication
* source service.
*/
- ReplicationSourceService getReplicationSourceService() {
+ @VisibleForTesting
+ public ReplicationSourceService getReplicationSourceService() {
return replicationSourceHandler;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/96e48c3d/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 2d5dedd..2285a5e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -138,6 +138,12 @@ public class ReplicationSource extends Thread
private AtomicInteger logQueueSize = new AtomicInteger(0);
private ConcurrentHashMap<String, ReplicationSourceWorkerThread> workerThreads =
new ConcurrentHashMap<String, ReplicationSourceWorkerThread>();
+ // Hold the state of a replication worker thread
+ public enum WorkerState {
+ RUNNING,
+ STOPPED,
+ FINISHED // The worker is done processing a recovered queue
+ }
/**
* Instantiation method used by region servers
@@ -362,7 +368,7 @@ public class ReplicationSource extends Thread
this.sourceRunning = false;
Collection<ReplicationSourceWorkerThread> workers = workerThreads.values();
for (ReplicationSourceWorkerThread worker : workers) {
- worker.setWorkerRunning(false);
+ worker.setWorkerState(WorkerState.STOPPED);
worker.interrupt();
}
ListenableFuture<Service.State> future = null;
@@ -477,8 +483,8 @@ public class ReplicationSource extends Thread
private int currentNbOperations = 0;
// Current size of data we need to replicate
private int currentSize = 0;
- // Indicates whether this particular worker is running
- private boolean workerRunning = true;
+ // Current state of the worker thread
+ private WorkerState state;
public ReplicationSourceWorkerThread(String walGroupId, PriorityBlockingQueue<Path> queue,
ReplicationQueueInfo replicationQueueInfo, ReplicationSource source) {
@@ -491,6 +497,7 @@ public class ReplicationSource extends Thread
@Override
public void run() {
+ setWorkerState(WorkerState.RUNNING);
// If this is recovered, the queue is already full and the first log
// normally has a position (unless the RS failed between 2 logs)
if (this.replicationQueueInfo.isQueueRecovered()) {
@@ -623,13 +630,13 @@ public class ReplicationSource extends Thread
sleepMultiplier = 1;
shipEdits(currentWALisBeingWrittenTo, entries);
}
- if (replicationQueueInfo.isQueueRecovered()) {
+ if (replicationQueueInfo.isQueueRecovered() && getWorkerState() == WorkerState.FINISHED) {
// use synchronize to make sure one last thread will clean the queue
synchronized (workerThreads) {
Threads.sleep(100);// wait a short while for other worker thread to fully exit
boolean allOtherTaskDone = true;
for (ReplicationSourceWorkerThread worker : workerThreads.values()) {
- if (!worker.equals(this) && worker.isAlive()) {
+ if (!worker.equals(this) && worker.getWorkerState() != WorkerState.FINISHED) {
allOtherTaskDone = false;
break;
}
@@ -641,6 +648,10 @@ public class ReplicationSource extends Thread
}
}
}
+ // If the worker exits run loop without finishing it's task, mark it as stopped.
+ if (state != WorkerState.FINISHED) {
+ setWorkerState(WorkerState.STOPPED);
+ }
}
/**
@@ -1023,7 +1034,7 @@ public class ReplicationSource extends Thread
LOG.debug("Finished recovering queue for group " + walGroupId + " of peer "
+ peerClusterZnode);
metrics.incrCompletedRecoveryQueue();
- workerRunning = false;
+ setWorkerState(WorkerState.FINISHED);
return true;
}
return false;
@@ -1054,7 +1065,7 @@ public class ReplicationSource extends Thread
}
private boolean isWorkerActive() {
- return !stopper.isStopped() && workerRunning && !isInterrupted();
+ return !stopper.isStopped() && state == WorkerState.RUNNING && !isInterrupted();
}
private void terminate(String reason, Exception cause) {
@@ -1065,13 +1076,26 @@ public class ReplicationSource extends Thread
LOG.error("Closing worker for wal group " + this.walGroupId
+ " because an error occurred: " + reason, cause);
}
+ setWorkerState(WorkerState.STOPPED);
this.interrupt();
Threads.shutdown(this, sleepForRetries);
LOG.info("ReplicationSourceWorker " + this.getName() + " terminated");
}
- public void setWorkerRunning(boolean workerRunning) {
- this.workerRunning = workerRunning;
+ /**
+ * Set the worker state
+ * @param state
+ */
+ public void setWorkerState(WorkerState state) {
+ this.state = state;
+ }
+
+ /**
+ * Get the current state of this worker.
+ * @return WorkerState
+ */
+ public WorkerState getWorkerState() {
+ return state;
}
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/96e48c3d/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
index 9bf0e93..8c597fa 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
@@ -18,9 +18,11 @@
*/
package org.apache.hadoop.hbase.replication;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
+import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -30,6 +32,8 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CoordinatedStateManager;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.Waiter.Predicate;
@@ -37,6 +41,10 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.replication.regionserver.Replication;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALProvider;
@@ -46,7 +54,8 @@ import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FSUtils;
+
+import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -60,10 +69,12 @@ public class TestReplicationSource {
LogFactory.getLog(TestReplicationSource.class);
private final static HBaseTestingUtility TEST_UTIL =
new HBaseTestingUtility();
+ private final static HBaseTestingUtility TEST_UTIL_PEER =
+ new HBaseTestingUtility();
private static FileSystem FS;
private static Path oldLogDir;
private static Path logDir;
- private static Configuration conf = HBaseConfiguration.create();
+ private static Configuration conf = TEST_UTIL.getConfiguration();
/**
* @throws java.lang.Exception
@@ -79,6 +90,13 @@ public class TestReplicationSource {
if (FS.exists(logDir)) FS.delete(logDir, true);
}
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL_PEER.shutdownMiniHBaseCluster();
+ TEST_UTIL.shutdownMiniHBaseCluster();
+ TEST_UTIL.shutdownMiniDFSCluster();
+ }
+
/**
* Sanity check that we can move logs around while we are reading
* from them. Should this test fail, ReplicationSource would have a hard
@@ -165,5 +183,111 @@ public class TestReplicationSource {
}
+ /**
+ * Tests that recovered queues are preserved on a regionserver shutdown.
+ * See HBASE-18192
+ * @throws Exception
+ */
+ @Test
+ public void testServerShutdownRecoveredQueue() throws Exception {
+ try {
+ // Ensure single-threaded WAL
+ conf.set("hbase.wal.provider", "defaultProvider");
+ conf.setInt("replication.sleep.before.failover", 2000);
+ // Introduces a delay in regionserver shutdown to give the race condition a chance to kick in.
+ conf.set(HConstants.REGION_SERVER_IMPL, ShutdownDelayRegionServer.class.getName());
+ MiniHBaseCluster cluster = TEST_UTIL.startMiniCluster(2);
+ TEST_UTIL_PEER.startMiniCluster(1);
+
+ HRegionServer serverA = cluster.getRegionServer(0);
+ final ReplicationSourceManager managerA =
+ ((Replication) serverA.getReplicationSourceService()).getReplicationManager();
+ HRegionServer serverB = cluster.getRegionServer(1);
+ final ReplicationSourceManager managerB =
+ ((Replication) serverB.getReplicationSourceService()).getReplicationManager();
+ final ReplicationAdmin replicationAdmin = new ReplicationAdmin(TEST_UTIL.getConfiguration());
+
+ final String peerId = "TestPeer";
+ replicationAdmin.addPeer(peerId,
+ new ReplicationPeerConfig().setClusterKey(TEST_UTIL_PEER.getClusterKey()), null);
+ // Wait for replication sources to come up
+ Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+ @Override public boolean evaluate() throws Exception {
+ return !(managerA.getSources().isEmpty() || managerB.getSources().isEmpty());
+ }
+ });
+ // Disabling peer makes sure there is at least one log to claim when the server dies
+ // The recovered queue will also stay there until the peer is disabled even if the
+ // WALs it contains have no data.
+ replicationAdmin.disablePeer(peerId);
+
+ // Stopping serverA
+ // It's queues should be claimed by the only other alive server i.e. serverB
+ cluster.stopRegionServer(serverA.getServerName());
+ Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+ @Override public boolean evaluate() throws Exception {
+ return managerB.getOldSources().size() == 1;
+ }
+ });
+
+ final HRegionServer serverC = cluster.startRegionServer().getRegionServer();
+ serverC.waitForServerOnline();
+ Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+ @Override public boolean evaluate() throws Exception {
+ return serverC.getReplicationSourceService() != null;
+ }
+ });
+ final ReplicationSourceManager managerC =
+ ((Replication) serverC.getReplicationSourceService()).getReplicationManager();
+ // Sanity check
+ assertEquals(0, managerC.getOldSources().size());
+
+ // Stopping serverB
+ // Now serverC should have two recovered queues:
+ // 1. The serverB's normal queue
+ // 2. serverA's recovered queue on serverB
+ cluster.stopRegionServer(serverB.getServerName());
+ Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+ @Override public boolean evaluate() throws Exception {
+ return managerC.getOldSources().size() == 2;
+ }
+ });
+ replicationAdmin.enablePeer(peerId);
+ Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+ @Override public boolean evaluate() throws Exception {
+ return managerC.getOldSources().size() == 0;
+ }
+ });
+ } finally {
+ conf.set(HConstants.REGION_SERVER_IMPL, HRegionServer.class.getName());
+ }
+ }
+
+ /**
+ * Regionserver implementation that adds a delay on the graceful shutdown.
+ */
+ public static class ShutdownDelayRegionServer extends HRegionServer {
+ public ShutdownDelayRegionServer(Configuration conf) throws IOException, InterruptedException {
+ super(conf);
+ }
+
+ public ShutdownDelayRegionServer(Configuration conf, CoordinatedStateManager csm)
+ throws IOException, InterruptedException {
+ super(conf, csm);
+ }
+
+ @Override
+ protected void stopServiceThreads() {
+ // Add a delay before service threads are shutdown.
+ // This will keep the zookeeper connection alive for the duration of the delay.
+ LOG.info("Adding a delay to the regionserver shutdown");
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException ex) {
+ LOG.error("Interrupted while sleeping");
+ }
+ super.stopServiceThreads();
+ }
+ }
}