You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2017/06/10 19:46:16 UTC

[4/4] hbase git commit: HBASE-18137 Replication gets stuck for empty WALs

HBASE-18137 Replication gets stuck for empty WALs

Signed-off-by: Andrew Purtell <ap...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/385b7924
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/385b7924
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/385b7924

Branch: refs/heads/branch-2
Commit: 385b792446ea1b0c58b7365904d677ba48eec930
Parents: eca1ec3
Author: Vincent <vi...@gmail.com>
Authored: Fri Jun 9 18:47:14 2017 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Sat Jun 10 12:45:40 2017 -0700

----------------------------------------------------------------------
 .../ReplicationSourceShipperThread.java         |  2 +-
 .../ReplicationSourceWALReaderThread.java       | 30 ++++++++
 .../hbase/replication/TestReplicationBase.java  |  1 +
 .../replication/TestReplicationSmallTests.java  | 80 ++++++++++++++++++++
 4 files changed, 112 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/385b7924/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java
index d1a8ac2..6807da2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java
@@ -303,7 +303,7 @@ public class ReplicationSourceShipperThread extends Thread {
   }
 
   public Path getCurrentPath() {
-    return this.currentPath;
+    return this.entryReader.getCurrentPath();
   }
 
   public long getCurrentPosition() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/385b7924/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
index ad08866..c1af6e6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hadoop.hbase.replication.regionserver;
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -189,6 +190,7 @@ public class ReplicationSourceWALReaderThread extends Thread {
           sleepMultiplier++;
         } else {
           LOG.error("Failed to read stream of replication entries", e);
+          handleEofException(e);
         }
         Threads.sleep(sleepForRetries * sleepMultiplier);
       } catch (InterruptedException e) {
@@ -198,6 +200,34 @@ public class ReplicationSourceWALReaderThread extends Thread {
     }
   }
 
+  // if we get an EOF due to a zero-length log, and there are other logs in queue
+  // (highly likely we've closed the current log), we've hit the max retries, and autorecovery is
+  // enabled, then dump the log
+  private void handleEofException(Exception e) {
+    if (e.getCause() instanceof EOFException && logQueue.size() > 1
+        && conf.getBoolean("replication.source.eof.autorecovery", false)) {
+      try {
+        if (fs.getFileStatus(logQueue.peek()).getLen() == 0) {
+          LOG.warn("Forcing removal of 0 length log in queue: " + logQueue.peek());
+          logQueue.remove();
+          currentPosition = 0;
+        }
+      } catch (IOException ioe) {
+        LOG.warn("Couldn't get file length information about log " + logQueue.peek());
+      }
+    }
+  }
+
+  public Path getCurrentPath() {
+    // if we've read some WAL entries, get the Path we read from
+    WALEntryBatch batchQueueHead = entryBatchQueue.peek();
+    if (batchQueueHead != null) {
+      return batchQueueHead.lastWalPath;
+    }
+    // otherwise, we must be currently reading from the head of the log queue
+    return logQueue.peek();
+  }
+
   //returns false if we've already exceeded the global quota
   private boolean checkQuota() {
     // try not to go over total quota

http://git-wip-us.apache.org/repos/asf/hbase/blob/385b7924/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
index 81fe629..9cf80d4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
@@ -104,6 +104,7 @@ public class TestReplicationBase {
     conf1.setLong("replication.sleep.before.failover", 2000);
     conf1.setInt("replication.source.maxretriesmultiplier", 10);
     conf1.setFloat("replication.source.ratio", 1.0f);
+    conf1.setBoolean("replication.source.eof.autorecovery", true);
 
     utility1 = new HBaseTestingUtility(conf1);
     utility1.startMiniZKCluster();

http://git-wip-us.apache.org/repos/asf/hbase/blob/385b7924/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
index f1b2015..cc3e43b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
@@ -57,6 +58,8 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.replication.regionserver.Replication;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
 import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
@@ -65,6 +68,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.JVMClusterUtil;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALKey;
 import org.apache.hadoop.mapreduce.Job;
@@ -977,4 +981,80 @@ public class TestReplicationSmallTests extends TestReplicationBase {
     assertEquals(NB_ROWS_IN_BATCH,
       job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
   }
+
+  @Test
+  public void testEmptyWALRecovery() throws Exception {
+    final int numRs = utility1.getHBaseCluster().getRegionServerThreads().size();
+
+    // for each RS, create an empty wal with same walGroupId
+    final List<Path> emptyWalPaths = new ArrayList<>();
+    long ts = System.currentTimeMillis();
+    for (int i = 0; i < numRs; i++) {
+      HRegionInfo regionInfo =
+          utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
+      WAL wal = utility1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
+      Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal);
+      String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
+      Path emptyWalPath = new Path(utility1.getDataTestDir(), walGroupId + "." + ts);
+      utility1.getTestFileSystem().create(emptyWalPath).close();
+      emptyWalPaths.add(emptyWalPath);
+    }
+
+    // inject our empty wal into the replication queue
+    for (int i = 0; i < numRs; i++) {
+      Replication replicationService =
+          (Replication) utility1.getHBaseCluster().getRegionServer(i).getReplicationSourceService();
+      replicationService.preLogRoll(null, emptyWalPaths.get(i));
+      replicationService.postLogRoll(null, emptyWalPaths.get(i));
+    }
+
+    // wait for ReplicationSource to start reading from our empty wal
+    waitForLogAdvance(numRs, emptyWalPaths, false);
+
+    // roll the original wal, which enqueues a new wal behind our empty wal
+    for (int i = 0; i < numRs; i++) {
+      HRegionInfo regionInfo =
+          utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
+      WAL wal = utility1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
+      wal.rollWriter(true);
+    }
+
+    // ReplicationSource should advance past the empty wal, or else the test will fail
+    waitForLogAdvance(numRs, emptyWalPaths, true);
+
+    // we're now writing to the new wal
+    // if everything works, the source should've stopped reading from the empty wal, and start
+    // replicating from the new wal
+    testSimplePutDelete();
+  }
+
+  /**
+   * Waits for the ReplicationSource to start reading from the given paths
+   * @param numRs number of regionservers
+   * @param emptyWalPaths path for each regionserver
+   * @param invert if true, waits until ReplicationSource is NOT reading from the given paths
+   */
+  private void waitForLogAdvance(final int numRs, final List<Path> emptyWalPaths,
+      final boolean invert) throws Exception {
+    Waiter.waitFor(conf1, 10000, new Waiter.Predicate<Exception>() {
+      @Override
+      public boolean evaluate() throws Exception {
+        for (int i = 0; i < numRs; i++) {
+          Replication replicationService = (Replication) utility1.getHBaseCluster()
+              .getRegionServer(i).getReplicationSourceService();
+          for (ReplicationSourceInterface rsi : replicationService.getReplicationManager()
+              .getSources()) {
+            ReplicationSource source = (ReplicationSource) rsi;
+            if (!invert && !emptyWalPaths.get(i).equals(source.getCurrentPath())) {
+              return false;
+            }
+            if (invert && emptyWalPaths.get(i).equals(source.getCurrentPath())) {
+              return false;
+            }
+          }
+        }
+        return true;
+      }
+    });
+  }
 }