You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2017/06/10 02:53:09 UTC

hbase git commit: HBASE-18192: Replication drops recovered queues on region server shutdown

Repository: hbase
Updated Branches:
  refs/heads/master e5ea45705 -> eb2dc5d2a


HBASE-18192: Replication drops recovered queues on region server shutdown

Signed-off-by: tedyu <yu...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/eb2dc5d2
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/eb2dc5d2
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/eb2dc5d2

Branch: refs/heads/master
Commit: eb2dc5d2a524f816fc5cf707b853117bc6ada01a
Parents: e5ea457
Author: Ashu Pachauri <as...@gmail.com>
Authored: Fri Jun 9 13:49:33 2017 -0700
Committer: tedyu <yu...@gmail.com>
Committed: Fri Jun 9 19:52:58 2017 -0700

----------------------------------------------------------------------
 .../RecoveredReplicationSource.java             |   8 +-
 ...RecoveredReplicationSourceShipperThread.java |   8 +-
 .../regionserver/ReplicationSource.java         |   4 +-
 .../ReplicationSourceShipperThread.java         |  35 ++++-
 .../replication/TestReplicationSource.java      | 127 ++++++++++++++++++-
 5 files changed, 166 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/eb2dc5d2/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
index 388b8d4..d3bcff1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
@@ -160,14 +160,14 @@ public class RecoveredReplicationSource extends ReplicationSource {
     // use synchronize to make sure one last thread will clean the queue
     synchronized (workerThreads) {
       Threads.sleep(100);// wait a short while for other worker thread to fully exit
-      boolean allOtherTaskDone = true;
+      boolean allTasksDone = true;
       for (ReplicationSourceShipperThread worker : workerThreads.values()) {
-        if (worker.isActive()) {
-          allOtherTaskDone = false;
+        if (!worker.isFinished()) {
+          allTasksDone = false;
           break;
         }
       }
-      if (allOtherTaskDone) {
+      if (allTasksDone) {
         manager.closeRecoveredQueue(this);
         LOG.info("Finished recovering queue " + peerClusterZnode + " with the following stats: "
             + getStats());

http://git-wip-us.apache.org/repos/asf/hbase/blob/eb2dc5d2/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipperThread.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipperThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipperThread.java
index 024b0c4..65aeb2f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipperThread.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipperThread.java
@@ -51,6 +51,7 @@ public class RecoveredReplicationSourceShipperThread extends ReplicationSourceSh
 
   @Override
   public void run() {
+    setWorkerState(WorkerState.RUNNING);
     // Loop until we close down
     while (isActive()) {
       int sleepMultiplier = 1;
@@ -77,7 +78,7 @@ public class RecoveredReplicationSourceShipperThread extends ReplicationSourceSh
           LOG.debug("Finished recovering queue for group " + walGroupId + " of peer "
               + source.getPeerClusterZnode());
           source.getSourceMetrics().incrCompletedRecoveryQueue();
-          setWorkerRunning(false);
+          setWorkerState(WorkerState.FINISHED);
           continue;
         }
       } catch (InterruptedException e) {
@@ -85,8 +86,11 @@ public class RecoveredReplicationSourceShipperThread extends ReplicationSourceSh
         Thread.currentThread().interrupt();
       }
     }
-
     source.tryFinish();
+    // If the worker exits run loop without finishing its task, mark it as stopped.
+    if (!isFinished()) {
+      setWorkerState(WorkerState.STOPPED);
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/eb2dc5d2/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index d098fd9..1dbf07f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -57,13 +57,13 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
 import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
 import org.apache.hadoop.hbase.replication.WALEntryFilter;
-import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReaderThread.WALEntryBatch;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 
+
 /**
  * Class that handles the source of a replication stream.
  * Currently does not handle more than 1 slave
@@ -448,7 +448,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
     this.sourceRunning = false;
     Collection<ReplicationSourceShipperThread> workers = workerThreads.values();
     for (ReplicationSourceShipperThread worker : workers) {
-      worker.setWorkerRunning(false);
+      worker.stopWorker();
       worker.entryReader.interrupt();
       worker.interrupt();
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/eb2dc5d2/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java
index b0f7fee..d1a8ac2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java
@@ -54,6 +54,13 @@ import com.google.common.cache.LoadingCache;
 public class ReplicationSourceShipperThread extends Thread {
   private static final Log LOG = LogFactory.getLog(ReplicationSourceShipperThread.class);
 
+  // Hold the state of a replication worker thread
+  public enum WorkerState {
+    RUNNING,
+    STOPPED,
+    FINISHED,  // The worker is done processing a recovered queue
+  }
+
   protected final Configuration conf;
   protected final String walGroupId;
   protected final PriorityBlockingQueue<Path> queue;
@@ -63,8 +70,8 @@ public class ReplicationSourceShipperThread extends Thread {
   protected long lastLoggedPosition = -1;
   // Path of the current log
   protected volatile Path currentPath;
-  // Indicates whether this particular worker is running
-  private boolean workerRunning = true;
+  // Current state of the worker thread
+  private WorkerState state;
   protected ReplicationSourceWALReaderThread entryReader;
 
   // How long should we sleep for each retry
@@ -97,6 +104,7 @@ public class ReplicationSourceShipperThread extends Thread {
 
   @Override
   public void run() {
+    setWorkerState(WorkerState.RUNNING);
     // Loop until we close down
     while (isActive()) {
       int sleepMultiplier = 1;
@@ -126,6 +134,10 @@ public class ReplicationSourceShipperThread extends Thread {
         Thread.currentThread().interrupt();
       }
     }
+    // If the worker exits run loop without finishing its task, mark it as stopped.
+    if (state != WorkerState.FINISHED) {
+      setWorkerState(WorkerState.STOPPED);
+    }
   }
 
   /**
@@ -307,12 +319,23 @@ public class ReplicationSourceShipperThread extends Thread {
   }
 
   protected boolean isActive() {
-    return source.isSourceActive() && workerRunning && !isInterrupted();
+    return source.isSourceActive() && state == WorkerState.RUNNING && !isInterrupted();
+  }
+
+  public void setWorkerState(WorkerState state) {
+    this.state = state;
+  }
+
+  public WorkerState getWorkerState() {
+    return state;
+  }
+
+  public void stopWorker() {
+    setWorkerState(WorkerState.STOPPED);
   }
 
-  public void setWorkerRunning(boolean workerRunning) {
-    entryReader.setReaderRunning(workerRunning);
-    this.workerRunning = workerRunning;
+  public boolean isFinished() {
+    return state == WorkerState.FINISHED;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/eb2dc5d2/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
index 7461edb..c3b7eaf 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
@@ -18,9 +18,11 @@
  */
 package org.apache.hadoop.hbase.replication;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 
+import java.io.IOException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -31,6 +33,8 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CoordinatedStateManager;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.Waiter.Predicate;
@@ -38,6 +42,9 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.replication.regionserver.Replication;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALProvider;
@@ -49,12 +56,12 @@ import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.mockito.Mockito;
 
-import static org.mockito.Mockito.mock;
 
 @Category({ReplicationTests.class, MediumTests.class})
 public class TestReplicationSource {
@@ -63,10 +70,12 @@ public class TestReplicationSource {
       LogFactory.getLog(TestReplicationSource.class);
   private final static HBaseTestingUtility TEST_UTIL =
       new HBaseTestingUtility();
+  private final static HBaseTestingUtility TEST_UTIL_PEER =
+      new HBaseTestingUtility();
   private static FileSystem FS;
   private static Path oldLogDir;
   private static Path logDir;
-  private static Configuration conf = HBaseConfiguration.create();
+  private static Configuration conf = TEST_UTIL.getConfiguration();
 
   /**
    * @throws java.lang.Exception
@@ -82,6 +91,13 @@ public class TestReplicationSource {
     if (FS.exists(logDir)) FS.delete(logDir, true);
   }
 
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL_PEER.shutdownMiniHBaseCluster();
+    TEST_UTIL.shutdownMiniHBaseCluster();
+    TEST_UTIL.shutdownMiniDFSCluster();
+  }
+
   /**
    * Sanity check that we can move logs around while we are reading
    * from them. Should this test fail, ReplicationSource would have a hard
@@ -172,5 +188,112 @@ public class TestReplicationSource {
 
   }
 
+  /**
+   * Tests that recovered queues are preserved on a regionserver shutdown.
+   * See HBASE-18192
+   * @throws Exception
+   */
+  @Test
+  public void testServerShutdownRecoveredQueue() throws Exception {
+    try {
+      // Ensure single-threaded WAL
+      conf.set("hbase.wal.provider", "defaultProvider");
+      conf.setInt("replication.sleep.before.failover", 2000);
+      // Introduces a delay in regionserver shutdown to give the race condition a chance to kick in.
+      conf.set(HConstants.REGION_SERVER_IMPL, ShutdownDelayRegionServer.class.getName());
+      MiniHBaseCluster cluster = TEST_UTIL.startMiniCluster(2);
+      TEST_UTIL_PEER.startMiniCluster(1);
+
+      HRegionServer serverA = cluster.getRegionServer(0);
+      final ReplicationSourceManager managerA =
+          ((Replication) serverA.getReplicationSourceService()).getReplicationManager();
+      HRegionServer serverB = cluster.getRegionServer(1);
+      final ReplicationSourceManager managerB =
+          ((Replication) serverB.getReplicationSourceService()).getReplicationManager();
+      final Admin admin = TEST_UTIL.getAdmin();
+
+      final String peerId = "TestPeer";
+      admin.addReplicationPeer(peerId,
+          new ReplicationPeerConfig().setClusterKey(TEST_UTIL_PEER.getClusterKey()));
+      // Wait for replication sources to come up
+      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+        @Override public boolean evaluate() throws Exception {
+          return !(managerA.getSources().isEmpty() || managerB.getSources().isEmpty());
+        }
+      });
+      // Disabling peer makes sure there is at least one log to claim when the server dies
+      // The recovered queue will also stay there until the peer is disabled even if the
+      // WALs it contains have no data.
+      admin.disableReplicationPeer(peerId);
+
+      // Stopping serverA
+      // It's queues should be claimed by the only other alive server i.e. serverB
+      cluster.stopRegionServer(serverA.getServerName());
+      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+        @Override public boolean evaluate() throws Exception {
+          return managerB.getOldSources().size() == 1;
+        }
+      });
+
+      final HRegionServer serverC = cluster.startRegionServer().getRegionServer();
+      serverC.waitForServerOnline();
+      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+        @Override public boolean evaluate() throws Exception {
+          return serverC.getReplicationSourceService() != null;
+        }
+      });
+      final ReplicationSourceManager managerC =
+          ((Replication) serverC.getReplicationSourceService()).getReplicationManager();
+      // Sanity check
+      assertEquals(0, managerC.getOldSources().size());
+
+      // Stopping serverB
+      // Now serverC should have two recovered queues:
+      // 1. The serverB's normal queue
+      // 2. serverA's recovered queue on serverB
+      cluster.stopRegionServer(serverB.getServerName());
+      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+        @Override public boolean evaluate() throws Exception {
+          return managerC.getOldSources().size() == 2;
+        }
+      });
+      admin.enableReplicationPeer(peerId);
+      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+        @Override public boolean evaluate() throws Exception {
+          return managerC.getOldSources().size() == 0;
+        }
+      });
+    } finally {
+      conf.set(HConstants.REGION_SERVER_IMPL, HRegionServer.class.getName());
+    }
+  }
+
+  /**
+   * Regionserver implementation that adds a delay on the graceful shutdown.
+   */
+  public static class ShutdownDelayRegionServer extends HRegionServer {
+    public ShutdownDelayRegionServer(Configuration conf) throws IOException, InterruptedException {
+      super(conf);
+    }
+
+    public ShutdownDelayRegionServer(Configuration conf, CoordinatedStateManager csm)
+        throws IOException, InterruptedException {
+      super(conf, csm);
+    }
+
+    @Override
+    protected void stopServiceThreads() {
+      // Add a delay before service threads are shutdown.
+      // This will keep the zookeeper connection alive for the duration of the delay.
+      LOG.info("Adding a delay to the regionserver shutdown");
+      try {
+        Thread.sleep(2000);
+      } catch (InterruptedException ex) {
+        LOG.error("Interrupted while sleeping");
+      }
+      super.stopServiceThreads();
+    }
+  }
+
 }