You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2017/06/10 03:03:25 UTC

hbase git commit: HBASE-18192: Replication drops recovered queues on region server shutdown (Ashu Pachauri)

Repository: hbase
Updated Branches:
  refs/heads/branch-1.2 a2617b00c -> 96e48c3df


HBASE-18192: Replication drops recovered queues on region server shutdown (Ashu Pachauri)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/96e48c3d
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/96e48c3d
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/96e48c3d

Branch: refs/heads/branch-1.2
Commit: 96e48c3df597fc1450546818e2bd34cfc1fd5c10
Parents: a2617b0
Author: tedyu <yu...@gmail.com>
Authored: Fri Jun 9 20:03:20 2017 -0700
Committer: tedyu <yu...@gmail.com>
Committed: Fri Jun 9 20:03:20 2017 -0700

----------------------------------------------------------------------
 .../hbase/regionserver/HRegionServer.java       |   3 +-
 .../regionserver/ReplicationSource.java         |  42 ++++--
 .../replication/TestReplicationSource.java      | 128 ++++++++++++++++++-
 3 files changed, 161 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/96e48c3d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 129b5a7..4ae0286 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -2213,7 +2213,8 @@ public class HRegionServer extends HasThread implements
    * @return Return the object that implements the replication
    * source service.
    */
-  ReplicationSourceService getReplicationSourceService() {
+  @VisibleForTesting
+  public ReplicationSourceService getReplicationSourceService() {
     return replicationSourceHandler;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/96e48c3d/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 2d5dedd..2285a5e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -138,6 +138,12 @@ public class ReplicationSource extends Thread
   private AtomicInteger logQueueSize = new AtomicInteger(0);
   private ConcurrentHashMap<String, ReplicationSourceWorkerThread> workerThreads =
       new ConcurrentHashMap<String, ReplicationSourceWorkerThread>();
+  // Hold the state of a replication worker thread
+  public enum WorkerState {
+    RUNNING,
+    STOPPED,
+    FINISHED  // The worker is done processing a recovered queue
+  }
 
   /**
    * Instantiation method used by region servers
@@ -362,7 +368,7 @@ public class ReplicationSource extends Thread
     this.sourceRunning = false;
     Collection<ReplicationSourceWorkerThread> workers = workerThreads.values();
     for (ReplicationSourceWorkerThread worker : workers) {
-      worker.setWorkerRunning(false);
+      worker.setWorkerState(WorkerState.STOPPED);
       worker.interrupt();
     }
     ListenableFuture<Service.State> future = null;
@@ -477,8 +483,8 @@ public class ReplicationSource extends Thread
     private int currentNbOperations = 0;
     // Current size of data we need to replicate
     private int currentSize = 0;
-    // Indicates whether this particular worker is running
-    private boolean workerRunning = true;
+    // Current state of the worker thread
+    private WorkerState state;
 
     public ReplicationSourceWorkerThread(String walGroupId, PriorityBlockingQueue<Path> queue,
         ReplicationQueueInfo replicationQueueInfo, ReplicationSource source) {
@@ -491,6 +497,7 @@ public class ReplicationSource extends Thread
 
     @Override
     public void run() {
+      setWorkerState(WorkerState.RUNNING);
       // If this is recovered, the queue is already full and the first log
       // normally has a position (unless the RS failed between 2 logs)
       if (this.replicationQueueInfo.isQueueRecovered()) {
@@ -623,13 +630,13 @@ public class ReplicationSource extends Thread
         sleepMultiplier = 1;
         shipEdits(currentWALisBeingWrittenTo, entries);
       }
-      if (replicationQueueInfo.isQueueRecovered()) {
+      if (replicationQueueInfo.isQueueRecovered() && getWorkerState() == WorkerState.FINISHED) {
         // use synchronize to make sure one last thread will clean the queue
         synchronized (workerThreads) {
           Threads.sleep(100);// wait a short while for other worker thread to fully exit
           boolean allOtherTaskDone = true;
           for (ReplicationSourceWorkerThread worker : workerThreads.values()) {
-            if (!worker.equals(this) && worker.isAlive()) {
+            if (!worker.equals(this) && worker.getWorkerState() != WorkerState.FINISHED) {
               allOtherTaskDone = false;
               break;
             }
@@ -641,6 +648,10 @@ public class ReplicationSource extends Thread
           }
         }
       }
+      // If the worker exits run loop without finishing it's task, mark it as stopped.
+      if (state != WorkerState.FINISHED) {
+        setWorkerState(WorkerState.STOPPED);
+      }
     }
 
     /**
@@ -1023,7 +1034,7 @@ public class ReplicationSource extends Thread
         LOG.debug("Finished recovering queue for group " + walGroupId + " of peer "
             + peerClusterZnode);
         metrics.incrCompletedRecoveryQueue();
-        workerRunning = false;
+        setWorkerState(WorkerState.FINISHED);
         return true;
       }
       return false;
@@ -1054,7 +1065,7 @@ public class ReplicationSource extends Thread
     }
 
     private boolean isWorkerActive() {
-      return !stopper.isStopped() && workerRunning && !isInterrupted();
+      return !stopper.isStopped() && state == WorkerState.RUNNING && !isInterrupted();
     }
 
     private void terminate(String reason, Exception cause) {
@@ -1065,13 +1076,26 @@ public class ReplicationSource extends Thread
         LOG.error("Closing worker for wal group " + this.walGroupId
             + " because an error occurred: " + reason, cause);
       }
+      setWorkerState(WorkerState.STOPPED);
       this.interrupt();
       Threads.shutdown(this, sleepForRetries);
       LOG.info("ReplicationSourceWorker " + this.getName() + " terminated");
     }
 
-    public void setWorkerRunning(boolean workerRunning) {
-      this.workerRunning = workerRunning;
+    /**
+     * Set the worker state
+     * @param state
+     */
+    public void setWorkerState(WorkerState state) {
+      this.state = state;
+    }
+
+    /**
+     * Get the current state of this worker.
+     * @return WorkerState
+     */
+    public WorkerState getWorkerState() {
+      return state;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/96e48c3d/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
index 9bf0e93..8c597fa 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
@@ -18,9 +18,11 @@
  */
 package org.apache.hadoop.hbase.replication;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 
+import java.io.IOException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -30,6 +32,8 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CoordinatedStateManager;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.Waiter.Predicate;
@@ -37,6 +41,10 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.replication.regionserver.Replication;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALProvider;
@@ -46,7 +54,8 @@ import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FSUtils;
+
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -60,10 +69,12 @@ public class TestReplicationSource {
       LogFactory.getLog(TestReplicationSource.class);
   private final static HBaseTestingUtility TEST_UTIL =
       new HBaseTestingUtility();
+  private final static HBaseTestingUtility TEST_UTIL_PEER =
+      new HBaseTestingUtility();
   private static FileSystem FS;
   private static Path oldLogDir;
   private static Path logDir;
-  private static Configuration conf = HBaseConfiguration.create();
+  private static Configuration conf = TEST_UTIL.getConfiguration();
 
   /**
    * @throws java.lang.Exception
@@ -79,6 +90,13 @@ public class TestReplicationSource {
     if (FS.exists(logDir)) FS.delete(logDir, true);
   }
 
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL_PEER.shutdownMiniHBaseCluster();
+    TEST_UTIL.shutdownMiniHBaseCluster();
+    TEST_UTIL.shutdownMiniDFSCluster();
+  }
+
   /**
    * Sanity check that we can move logs around while we are reading
    * from them. Should this test fail, ReplicationSource would have a hard
@@ -165,5 +183,111 @@ public class TestReplicationSource {
 
   }
 
+  /**
+   * Tests that recovered queues are preserved on a regionserver shutdown.
+   * See HBASE-18192
+   * @throws Exception
+   */
+  @Test
+  public void testServerShutdownRecoveredQueue() throws Exception {
+    try {
+      // Ensure single-threaded WAL
+      conf.set("hbase.wal.provider", "defaultProvider");
+      conf.setInt("replication.sleep.before.failover", 2000);
+      // Introduces a delay in regionserver shutdown to give the race condition a chance to kick in.
+      conf.set(HConstants.REGION_SERVER_IMPL, ShutdownDelayRegionServer.class.getName());
+      MiniHBaseCluster cluster = TEST_UTIL.startMiniCluster(2);
+      TEST_UTIL_PEER.startMiniCluster(1);
+
+      HRegionServer serverA = cluster.getRegionServer(0);
+      final ReplicationSourceManager managerA =
+          ((Replication) serverA.getReplicationSourceService()).getReplicationManager();
+      HRegionServer serverB = cluster.getRegionServer(1);
+      final ReplicationSourceManager managerB =
+          ((Replication) serverB.getReplicationSourceService()).getReplicationManager();
+      final ReplicationAdmin replicationAdmin = new ReplicationAdmin(TEST_UTIL.getConfiguration());
+
+      final String peerId = "TestPeer";
+      replicationAdmin.addPeer(peerId,
+          new ReplicationPeerConfig().setClusterKey(TEST_UTIL_PEER.getClusterKey()), null);
+      // Wait for replication sources to come up
+      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+        @Override public boolean evaluate() throws Exception {
+          return !(managerA.getSources().isEmpty() || managerB.getSources().isEmpty());
+        }
+      });
+      // Disabling peer makes sure there is at least one log to claim when the server dies
+      // The recovered queue will also stay there until the peer is disabled even if the
+      // WALs it contains have no data.
+      replicationAdmin.disablePeer(peerId);
+
+      // Stopping serverA
+      // It's queues should be claimed by the only other alive server i.e. serverB
+      cluster.stopRegionServer(serverA.getServerName());
+      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+        @Override public boolean evaluate() throws Exception {
+          return managerB.getOldSources().size() == 1;
+        }
+      });
+
+      final HRegionServer serverC = cluster.startRegionServer().getRegionServer();
+      serverC.waitForServerOnline();
+      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+        @Override public boolean evaluate() throws Exception {
+          return serverC.getReplicationSourceService() != null;
+        }
+      });
+      final ReplicationSourceManager managerC =
+          ((Replication) serverC.getReplicationSourceService()).getReplicationManager();
+      // Sanity check
+      assertEquals(0, managerC.getOldSources().size());
+
+      // Stopping serverB
+      // Now serverC should have two recovered queues:
+      // 1. The serverB's normal queue
+      // 2. serverA's recovered queue on serverB
+      cluster.stopRegionServer(serverB.getServerName());
+      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+        @Override public boolean evaluate() throws Exception {
+          return managerC.getOldSources().size() == 2;
+        }
+      });
+      replicationAdmin.enablePeer(peerId);
+      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+        @Override public boolean evaluate() throws Exception {
+          return managerC.getOldSources().size() == 0;
+        }
+      });
+    } finally {
+      conf.set(HConstants.REGION_SERVER_IMPL, HRegionServer.class.getName());
+    }
+  }
+
+  /**
+   * Regionserver implementation that adds a delay on the graceful shutdown.
+   */
+  public static class ShutdownDelayRegionServer extends HRegionServer {
+    public ShutdownDelayRegionServer(Configuration conf) throws IOException, InterruptedException {
+      super(conf);
+    }
+
+    public ShutdownDelayRegionServer(Configuration conf, CoordinatedStateManager csm)
+        throws IOException, InterruptedException {
+      super(conf, csm);
+    }
+
+    @Override
+    protected void stopServiceThreads() {
+      // Add a delay before service threads are shutdown.
+      // This will keep the zookeeper connection alive for the duration of the delay.
+      LOG.info("Adding a delay to the regionserver shutdown");
+      try {
+        Thread.sleep(2000);
+      } catch (InterruptedException ex) {
+        LOG.error("Interrupted while sleeping");
+      }
+      super.stopServiceThreads();
+    }
+  }
 }