You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2017/06/10 19:46:13 UTC
[1/4] hbase git commit: HBASE-18137 Replication gets stuck for empty
WALs
Repository: hbase
Updated Branches:
refs/heads/branch-1 6860ddca9 -> 650ef5cf5
refs/heads/branch-1.3 6a216c787 -> 6782dfca4
refs/heads/branch-2 eca1ec335 -> 385b79244
refs/heads/master ea64dbef7 -> 384e308e9
HBASE-18137 Replication gets stuck for empty WALs
Signed-off-by: Andrew Purtell <ap...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/384e308e
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/384e308e
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/384e308e
Branch: refs/heads/master
Commit: 384e308e9f2387422e76ceb1432d6b2b85a973cf
Parents: ea64dbe
Author: Vincent <vi...@gmail.com>
Authored: Fri Jun 9 18:47:14 2017 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Sat Jun 10 10:30:40 2017 -0700
----------------------------------------------------------------------
.../ReplicationSourceShipperThread.java | 2 +-
.../ReplicationSourceWALReaderThread.java | 30 ++++++++
.../hbase/replication/TestReplicationBase.java | 1 +
.../replication/TestReplicationSmallTests.java | 80 ++++++++++++++++++++
4 files changed, 112 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/384e308e/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java
index d1a8ac2..6807da2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java
@@ -303,7 +303,7 @@ public class ReplicationSourceShipperThread extends Thread {
}
public Path getCurrentPath() {
- return this.currentPath;
+ return this.entryReader.getCurrentPath();
}
public long getCurrentPosition() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/384e308e/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
index ad08866..c1af6e6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
@@ -18,6 +18,7 @@
*/
package org.apache.hadoop.hbase.replication.regionserver;
+import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
@@ -189,6 +190,7 @@ public class ReplicationSourceWALReaderThread extends Thread {
sleepMultiplier++;
} else {
LOG.error("Failed to read stream of replication entries", e);
+ handleEofException(e);
}
Threads.sleep(sleepForRetries * sleepMultiplier);
} catch (InterruptedException e) {
@@ -198,6 +200,34 @@ public class ReplicationSourceWALReaderThread extends Thread {
}
}
+ // if we get an EOF due to a zero-length log, and there are other logs in queue
+ // (highly likely we've closed the current log), we've hit the max retries, and autorecovery is
+ // enabled, then dump the log
+ private void handleEofException(Exception e) {
+ if (e.getCause() instanceof EOFException && logQueue.size() > 1
+ && conf.getBoolean("replication.source.eof.autorecovery", false)) {
+ try {
+ if (fs.getFileStatus(logQueue.peek()).getLen() == 0) {
+ LOG.warn("Forcing removal of 0 length log in queue: " + logQueue.peek());
+ logQueue.remove();
+ currentPosition = 0;
+ }
+ } catch (IOException ioe) {
+ LOG.warn("Couldn't get file length information about log " + logQueue.peek());
+ }
+ }
+ }
+
+ public Path getCurrentPath() {
+ // if we've read some WAL entries, get the Path we read from
+ WALEntryBatch batchQueueHead = entryBatchQueue.peek();
+ if (batchQueueHead != null) {
+ return batchQueueHead.lastWalPath;
+ }
+ // otherwise, we must be currently reading from the head of the log queue
+ return logQueue.peek();
+ }
+
//returns false if we've already exceeded the global quota
private boolean checkQuota() {
// try not to go over total quota
http://git-wip-us.apache.org/repos/asf/hbase/blob/384e308e/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
index 81fe629..9cf80d4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
@@ -104,6 +104,7 @@ public class TestReplicationBase {
conf1.setLong("replication.sleep.before.failover", 2000);
conf1.setInt("replication.source.maxretriesmultiplier", 10);
conf1.setFloat("replication.source.ratio", 1.0f);
+ conf1.setBoolean("replication.source.eof.autorecovery", true);
utility1 = new HBaseTestingUtility(conf1);
utility1.startMiniZKCluster();
http://git-wip-us.apache.org/repos/asf/hbase/blob/384e308e/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
index f1b2015..cc3e43b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
@@ -57,6 +58,8 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
import org.apache.hadoop.hbase.testclassification.LargeTests;
@@ -65,6 +68,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.mapreduce.Job;
@@ -977,4 +981,80 @@ public class TestReplicationSmallTests extends TestReplicationBase {
assertEquals(NB_ROWS_IN_BATCH,
job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
}
+
+ @Test
+ public void testEmptyWALRecovery() throws Exception {
+ final int numRs = utility1.getHBaseCluster().getRegionServerThreads().size();
+
+ // for each RS, create an empty wal with same walGroupId
+ final List<Path> emptyWalPaths = new ArrayList<>();
+ long ts = System.currentTimeMillis();
+ for (int i = 0; i < numRs; i++) {
+ HRegionInfo regionInfo =
+ utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
+ WAL wal = utility1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
+ Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal);
+ String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
+ Path emptyWalPath = new Path(utility1.getDataTestDir(), walGroupId + "." + ts);
+ utility1.getTestFileSystem().create(emptyWalPath).close();
+ emptyWalPaths.add(emptyWalPath);
+ }
+
+ // inject our empty wal into the replication queue
+ for (int i = 0; i < numRs; i++) {
+ Replication replicationService =
+ (Replication) utility1.getHBaseCluster().getRegionServer(i).getReplicationSourceService();
+ replicationService.preLogRoll(null, emptyWalPaths.get(i));
+ replicationService.postLogRoll(null, emptyWalPaths.get(i));
+ }
+
+ // wait for ReplicationSource to start reading from our empty wal
+ waitForLogAdvance(numRs, emptyWalPaths, false);
+
+ // roll the original wal, which enqueues a new wal behind our empty wal
+ for (int i = 0; i < numRs; i++) {
+ HRegionInfo regionInfo =
+ utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
+ WAL wal = utility1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
+ wal.rollWriter(true);
+ }
+
+ // ReplicationSource should advance past the empty wal, or else the test will fail
+ waitForLogAdvance(numRs, emptyWalPaths, true);
+
+ // we're now writing to the new wal
+ // if everything works, the source should've stopped reading from the empty wal, and start
+ // replicating from the new wal
+ testSimplePutDelete();
+ }
+
+ /**
+ * Waits for the ReplicationSource to start reading from the given paths
+ * @param numRs number of regionservers
+ * @param emptyWalPaths path for each regionserver
+ * @param invert if true, waits until ReplicationSource is NOT reading from the given paths
+ */
+ private void waitForLogAdvance(final int numRs, final List<Path> emptyWalPaths,
+ final boolean invert) throws Exception {
+ Waiter.waitFor(conf1, 10000, new Waiter.Predicate<Exception>() {
+ @Override
+ public boolean evaluate() throws Exception {
+ for (int i = 0; i < numRs; i++) {
+ Replication replicationService = (Replication) utility1.getHBaseCluster()
+ .getRegionServer(i).getReplicationSourceService();
+ for (ReplicationSourceInterface rsi : replicationService.getReplicationManager()
+ .getSources()) {
+ ReplicationSource source = (ReplicationSource) rsi;
+ if (!invert && !emptyWalPaths.get(i).equals(source.getCurrentPath())) {
+ return false;
+ }
+ if (invert && emptyWalPaths.get(i).equals(source.getCurrentPath())) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+ });
+ }
}
[2/4] hbase git commit: HBASE-18137 Replication gets stuck for empty
WALs
Posted by ap...@apache.org.
HBASE-18137 Replication gets stuck for empty WALs
Signed-off-by: Andrew Purtell <ap...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/650ef5cf
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/650ef5cf
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/650ef5cf
Branch: refs/heads/branch-1
Commit: 650ef5cf59ee7f6e4c219a9043b66a814da52f19
Parents: 6860ddc
Author: Vincent <vi...@gmail.com>
Authored: Fri Jun 9 18:36:23 2017 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Sat Jun 10 11:29:51 2017 -0700
----------------------------------------------------------------------
.../regionserver/ReplicationSource.java | 2 +-
.../ReplicationSourceWALReaderThread.java | 30 +++++++
.../hbase/replication/TestReplicationBase.java | 1 +
.../replication/TestReplicationSmallTests.java | 82 ++++++++++++++++++++
4 files changed, 114 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/650ef5cf/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 6954ea2..8378b9b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -931,7 +931,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
}
public Path getCurrentPath() {
- return this.currentPath;
+ return this.entryReader.getCurrentPath();
}
public long getCurrentPosition() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/650ef5cf/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
index 6f1c641..40828b7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
@@ -18,6 +18,7 @@
*/
package org.apache.hadoop.hbase.replication.regionserver;
+import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
@@ -188,6 +189,7 @@ public class ReplicationSourceWALReaderThread extends Thread {
sleepMultiplier++;
} else {
LOG.error("Failed to read stream of replication entries", e);
+ handleEofException(e);
}
Threads.sleep(sleepForRetries * sleepMultiplier);
} catch (InterruptedException e) {
@@ -197,6 +199,34 @@ public class ReplicationSourceWALReaderThread extends Thread {
}
}
+ // if we get an EOF due to a zero-length log, and there are other logs in queue
+ // (highly likely we've closed the current log), we've hit the max retries, and autorecovery is
+ // enabled, then dump the log
+ private void handleEofException(Exception e) {
+ if (e.getCause() instanceof EOFException && logQueue.size() > 1
+ && conf.getBoolean("replication.source.eof.autorecovery", false)) {
+ try {
+ if (fs.getFileStatus(logQueue.peek()).getLen() == 0) {
+ LOG.warn("Forcing removal of 0 length log in queue: " + logQueue.peek());
+ logQueue.remove();
+ currentPosition = 0;
+ }
+ } catch (IOException ioe) {
+ LOG.warn("Couldn't get file length information about log " + logQueue.peek());
+ }
+ }
+ }
+
+ public Path getCurrentPath() {
+ // if we've read some WAL entries, get the Path we read from
+ WALEntryBatch batchQueueHead = entryBatchQueue.peek();
+ if (batchQueueHead != null) {
+ return batchQueueHead.lastWalPath;
+ }
+ // otherwise, we must be currently reading from the head of the log queue
+ return logQueue.peek();
+ }
+
//returns false if we've already exceeded the global quota
private boolean checkQuota() {
// try not to go over total quota
http://git-wip-us.apache.org/repos/asf/hbase/blob/650ef5cf/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
index 498d26a..d0f40a6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
@@ -102,6 +102,7 @@ public class TestReplicationBase {
conf1.setLong("replication.sleep.before.failover", 2000);
conf1.setInt("replication.source.maxretriesmultiplier", 10);
conf1.setFloat("replication.source.ratio", 1.0f);
+ conf1.setBoolean("replication.source.eof.autorecovery", true);
utility1 = new HBaseTestingUtility(conf1);
utility1.startMiniZKCluster();
http://git-wip-us.apache.org/repos/asf/hbase/blob/650ef5cf/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
index d56834c..f94ad5a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
@@ -31,6 +31,7 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -39,6 +40,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
@@ -56,10 +58,14 @@ import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
+import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.mapreduce.Job;
import org.junit.Before;
@@ -809,4 +815,80 @@ public class TestReplicationSmallTests extends TestReplicationBase {
tableName.getNameAsString()};
runVerifyReplication(args, NB_ROWS_IN_BATCH *2, 0);
}
+
+ @Test
+ public void testEmptyWALRecovery() throws Exception {
+ final int numRs = utility1.getHBaseCluster().getRegionServerThreads().size();
+
+ // for each RS, create an empty wal with same walGroupId
+ final List<Path> emptyWalPaths = new ArrayList<>();
+ long ts = System.currentTimeMillis();
+ for (int i = 0; i < numRs; i++) {
+ HRegionInfo regionInfo =
+ utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
+ WAL wal = utility1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
+ Path currentWalPath = DefaultWALProvider.getCurrentFileName(wal);
+ String walGroupId = DefaultWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
+ Path emptyWalPath = new Path(utility1.getDataTestDir(), walGroupId + "." + ts);
+ utility1.getTestFileSystem().create(emptyWalPath).close();
+ emptyWalPaths.add(emptyWalPath);
+ }
+
+ // inject our empty wal into the replication queue
+ for (int i = 0; i < numRs; i++) {
+ Replication replicationService =
+ (Replication) utility1.getHBaseCluster().getRegionServer(i).getReplicationSourceService();
+ replicationService.preLogRoll(null, emptyWalPaths.get(i));
+ replicationService.postLogRoll(null, emptyWalPaths.get(i));
+ }
+
+ // wait for ReplicationSource to start reading from our empty wal
+ waitForLogAdvance(numRs, emptyWalPaths, false);
+
+ // roll the original wal, which enqueues a new wal behind our empty wal
+ for (int i = 0; i < numRs; i++) {
+ HRegionInfo regionInfo =
+ utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
+ WAL wal = utility1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
+ wal.rollWriter(true);
+ }
+
+ // ReplicationSource should advance past the empty wal, or else the test will fail
+ waitForLogAdvance(numRs, emptyWalPaths, true);
+
+ // we're now writing to the new wal
+ // if everything works, the source should've stopped reading from the empty wal, and start
+ // replicating from the new wal
+ testSimplePutDelete();
+ }
+
+ /**
+ * Waits for the ReplicationSource to start reading from the given paths
+ * @param numRs number of regionservers
+ * @param emptyWalPaths path for each regionserver
+ * @param invert if true, waits until ReplicationSource is NOT reading from the given paths
+ */
+ private void waitForLogAdvance(final int numRs, final List<Path> emptyWalPaths,
+ final boolean invert) throws Exception {
+ Waiter.waitFor(conf1, 10000, new Waiter.Predicate<Exception>() {
+ @Override
+ public boolean evaluate() throws Exception {
+ for (int i = 0; i < numRs; i++) {
+ Replication replicationService = (Replication) utility1.getHBaseCluster()
+ .getRegionServer(i).getReplicationSourceService();
+ for (ReplicationSourceInterface rsi : replicationService.getReplicationManager()
+ .getSources()) {
+ ReplicationSource source = (ReplicationSource) rsi;
+ if (!invert && !emptyWalPaths.get(i).equals(source.getCurrentPath())) {
+ return false;
+ }
+ if (invert && emptyWalPaths.get(i).equals(source.getCurrentPath())) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+ });
+ }
}
[3/4] hbase git commit: HBASE-18137 Replication gets stuck for empty
WALs
Posted by ap...@apache.org.
HBASE-18137 Replication gets stuck for empty WALs
Signed-off-by: Andrew Purtell <ap...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6782dfca
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6782dfca
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6782dfca
Branch: refs/heads/branch-1.3
Commit: 6782dfca4f3a2f5e02cc60a7c04d8d5d95ebc36e
Parents: 6a216c7
Author: Vincent <vi...@gmail.com>
Authored: Wed Jun 7 14:48:45 2017 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Sat Jun 10 12:26:12 2017 -0700
----------------------------------------------------------------------
.../regionserver/ReplicationSource.java | 16 ++--
.../hbase/replication/TestReplicationBase.java | 1 +
.../replication/TestReplicationSmallTests.java | 83 ++++++++++++++++++++
3 files changed, 94 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/6782dfca/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 65f581a..2285292 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -542,9 +542,9 @@ public class ReplicationSource extends Thread
terminate("Couldn't get the position of this recovered queue " + peerClusterZnode, e);
}
}
+ int sleepMultiplier = 1;
// Loop until we close down
while (isWorkerActive()) {
- int sleepMultiplier = 1;
// Sleep until replication is enabled again
if (!isPeerEnabled()) {
if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
@@ -622,7 +622,7 @@ public class ReplicationSource extends Thread
if (considerDumping &&
sleepMultiplier == maxRetriesMultiplier &&
- processEndOfFile()) {
+ processEndOfFile(false)) {
continue;
}
}
@@ -749,7 +749,7 @@ public class ReplicationSource extends Thread
}
// If we didn't get anything and the queue has an object, it means we
// hit the end of the file for sure
- return seenEntries == 0 && processEndOfFile();
+ return seenEntries == 0 && processEndOfFile(false);
}
/**
@@ -930,11 +930,12 @@ public class ReplicationSource extends Thread
// which throws a NPE if we open a file before any data node has the most recent block
// Just sleep and retry. Will require re-reading compressed WALs for compressionContext.
LOG.warn("Got NPE opening reader, will retry.");
- } else if (sleepMultiplier >= maxRetriesMultiplier) {
+ } else if (sleepMultiplier >= maxRetriesMultiplier
+ && conf.getBoolean("replication.source.eof.autorecovery", false)) {
// TODO Need a better way to determine if a file is really gone but
// TODO without scanning all logs dir
LOG.warn("Waited too long for this file, considering dumping");
- return !processEndOfFile();
+ return !processEndOfFile(true);
}
}
return true;
@@ -1100,7 +1101,7 @@ public class ReplicationSource extends Thread
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DE_MIGHT_IGNORE",
justification = "Yeah, this is how it works")
- protected boolean processEndOfFile() {
+ protected boolean processEndOfFile(boolean dumpOnlyIfZeroLength) {
// We presume this means the file we're reading is closed.
if (this.queue.size() != 0) {
// -1 means the wal wasn't closed cleanly.
@@ -1135,6 +1136,9 @@ public class ReplicationSource extends Thread
LOG.trace("Reached the end of log " + this.currentPath + ", stats: " + getStats()
+ ", and the length of the file is " + (stat == null ? "N/A" : stat.getLen()));
}
+ if (dumpOnlyIfZeroLength && stat.getLen() != 0) {
+ return false;
+ }
this.currentPath = null;
this.repLogReader.finishCurrentFile();
this.reader = null;
http://git-wip-us.apache.org/repos/asf/hbase/blob/6782dfca/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
index e52a600..01d9335 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
@@ -101,6 +101,7 @@ public class TestReplicationBase {
conf1.setLong("replication.sleep.before.failover", 2000);
conf1.setInt("replication.source.maxretriesmultiplier", 10);
conf1.setFloat("replication.source.ratio", 1.0f);
+ conf1.setBoolean("replication.source.eof.autorecovery", true);
utility1 = new HBaseTestingUtility(conf1);
utility1.startMiniZKCluster();
http://git-wip-us.apache.org/repos/asf/hbase/blob/6782dfca/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
index 42a127f..e1f9f01 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
@@ -30,6 +30,7 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.ClusterStatus;
@@ -40,6 +41,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Delete;
@@ -53,9 +55,13 @@ import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
+import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
@@ -758,4 +764,81 @@ public class TestReplicationSmallTests extends TestReplicationBase {
}
}
}
+
+ // "replication.source.eof.autorecovery" must be true for this to pass
+ @Test
+ public void testEmptyWALRecovery() throws Exception {
+ final int numRs = utility1.getHBaseCluster().getRegionServerThreads().size();
+
+ // for each RS, create an empty wal with same walGroupId
+ final List<Path> emptyWalPaths = new ArrayList<>();
+ long ts = System.currentTimeMillis();
+ for (int i = 0; i < numRs; i++) {
+ HRegionInfo regionInfo =
+ utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
+ WAL wal = utility1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
+ Path currentWalPath = DefaultWALProvider.getCurrentFileName(wal);
+ String walGroupId = DefaultWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
+ Path emptyWalPath = new Path(utility1.getDataTestDir(), walGroupId + "." + ts);
+ utility1.getTestFileSystem().create(emptyWalPath).close();
+ emptyWalPaths.add(emptyWalPath);
+ }
+
+ // inject our empty wal into the replication queue
+ for (int i = 0; i < numRs; i++) {
+ Replication replicationService =
+ (Replication) utility1.getHBaseCluster().getRegionServer(i).getReplicationSourceService();
+ replicationService.preLogRoll(null, emptyWalPaths.get(i));
+ replicationService.postLogRoll(null, emptyWalPaths.get(i));
+ }
+
+ // wait for ReplicationSource to start reading from our empty wal
+ waitForLogAdvance(numRs, emptyWalPaths, false);
+
+ // roll the original wal, which enqueues a new wal behind our empty wal
+ for (int i = 0; i < numRs; i++) {
+ HRegionInfo regionInfo =
+ utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
+ WAL wal = utility1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
+ wal.rollWriter(true);
+ }
+
+ // ReplicationSource should advance past the empty wal, or else the test will fail
+ waitForLogAdvance(numRs, emptyWalPaths, true);
+
+ // we're now writing to the new wal
+ // if everything works, the source should've stopped reading from the empty wal, and start
+ // replicating from the new wal
+ testSimplePutDelete();
+ }
+
+ /**
+ * Waits for the ReplicationSource to start reading from the given paths
+ * @param numRs number of regionservers
+ * @param emptyWalPaths path for each regionserver
+ * @param invert if true, waits until ReplicationSource is NOT reading from the given paths
+ */
+ private void waitForLogAdvance(final int numRs, final List<Path> emptyWalPaths,
+ final boolean invert) throws Exception {
+ Waiter.waitFor(conf1, 10000, new Waiter.Predicate<Exception>() {
+ @Override
+ public boolean evaluate() throws Exception {
+ for (int i = 0; i < numRs; i++) {
+ Replication replicationService = (Replication) utility1.getHBaseCluster()
+ .getRegionServer(i).getReplicationSourceService();
+ for (ReplicationSourceInterface rsi : replicationService.getReplicationManager()
+ .getSources()) {
+ ReplicationSource source = (ReplicationSource) rsi;
+ if (!invert && !emptyWalPaths.get(i).equals(source.getCurrentPath())) {
+ return false;
+ }
+ if (invert && emptyWalPaths.get(i).equals(source.getCurrentPath())) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+ });
+ }
}
[4/4] hbase git commit: HBASE-18137 Replication gets stuck for empty
WALs
Posted by ap...@apache.org.
HBASE-18137 Replication gets stuck for empty WALs
Signed-off-by: Andrew Purtell <ap...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/385b7924
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/385b7924
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/385b7924
Branch: refs/heads/branch-2
Commit: 385b792446ea1b0c58b7365904d677ba48eec930
Parents: eca1ec3
Author: Vincent <vi...@gmail.com>
Authored: Fri Jun 9 18:47:14 2017 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Sat Jun 10 12:45:40 2017 -0700
----------------------------------------------------------------------
.../ReplicationSourceShipperThread.java | 2 +-
.../ReplicationSourceWALReaderThread.java | 30 ++++++++
.../hbase/replication/TestReplicationBase.java | 1 +
.../replication/TestReplicationSmallTests.java | 80 ++++++++++++++++++++
4 files changed, 112 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/385b7924/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java
index d1a8ac2..6807da2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java
@@ -303,7 +303,7 @@ public class ReplicationSourceShipperThread extends Thread {
}
public Path getCurrentPath() {
- return this.currentPath;
+ return this.entryReader.getCurrentPath();
}
public long getCurrentPosition() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/385b7924/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
index ad08866..c1af6e6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
@@ -18,6 +18,7 @@
*/
package org.apache.hadoop.hbase.replication.regionserver;
+import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
@@ -189,6 +190,7 @@ public class ReplicationSourceWALReaderThread extends Thread {
sleepMultiplier++;
} else {
LOG.error("Failed to read stream of replication entries", e);
+ handleEofException(e);
}
Threads.sleep(sleepForRetries * sleepMultiplier);
} catch (InterruptedException e) {
@@ -198,6 +200,34 @@ public class ReplicationSourceWALReaderThread extends Thread {
}
}
+ // if we get an EOF due to a zero-length log, and there are other logs in queue
+ // (highly likely we've closed the current log), we've hit the max retries, and autorecovery is
+ // enabled, then dump the log
+ private void handleEofException(Exception e) {
+ if (e.getCause() instanceof EOFException && logQueue.size() > 1
+ && conf.getBoolean("replication.source.eof.autorecovery", false)) {
+ try {
+ if (fs.getFileStatus(logQueue.peek()).getLen() == 0) {
+ LOG.warn("Forcing removal of 0 length log in queue: " + logQueue.peek());
+ logQueue.remove();
+ currentPosition = 0;
+ }
+ } catch (IOException ioe) {
+ LOG.warn("Couldn't get file length information about log " + logQueue.peek());
+ }
+ }
+ }
+
+ public Path getCurrentPath() {
+ // if we've read some WAL entries, get the Path we read from
+ WALEntryBatch batchQueueHead = entryBatchQueue.peek();
+ if (batchQueueHead != null) {
+ return batchQueueHead.lastWalPath;
+ }
+ // otherwise, we must be currently reading from the head of the log queue
+ return logQueue.peek();
+ }
+
//returns false if we've already exceeded the global quota
private boolean checkQuota() {
// try not to go over total quota
http://git-wip-us.apache.org/repos/asf/hbase/blob/385b7924/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
index 81fe629..9cf80d4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
@@ -104,6 +104,7 @@ public class TestReplicationBase {
conf1.setLong("replication.sleep.before.failover", 2000);
conf1.setInt("replication.source.maxretriesmultiplier", 10);
conf1.setFloat("replication.source.ratio", 1.0f);
+ conf1.setBoolean("replication.source.eof.autorecovery", true);
utility1 = new HBaseTestingUtility(conf1);
utility1.startMiniZKCluster();
http://git-wip-us.apache.org/repos/asf/hbase/blob/385b7924/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
index f1b2015..cc3e43b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
@@ -57,6 +58,8 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
import org.apache.hadoop.hbase.testclassification.LargeTests;
@@ -65,6 +68,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.mapreduce.Job;
@@ -977,4 +981,80 @@ public class TestReplicationSmallTests extends TestReplicationBase {
assertEquals(NB_ROWS_IN_BATCH,
job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
}
+
+ @Test
+ public void testEmptyWALRecovery() throws Exception {
+ final int numRs = utility1.getHBaseCluster().getRegionServerThreads().size();
+
+ // for each RS, create an empty wal with same walGroupId
+ final List<Path> emptyWalPaths = new ArrayList<>();
+ long ts = System.currentTimeMillis();
+ for (int i = 0; i < numRs; i++) {
+ HRegionInfo regionInfo =
+ utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
+ WAL wal = utility1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
+ Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal);
+ String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
+ Path emptyWalPath = new Path(utility1.getDataTestDir(), walGroupId + "." + ts);
+ utility1.getTestFileSystem().create(emptyWalPath).close();
+ emptyWalPaths.add(emptyWalPath);
+ }
+
+ // inject our empty wal into the replication queue
+ for (int i = 0; i < numRs; i++) {
+ Replication replicationService =
+ (Replication) utility1.getHBaseCluster().getRegionServer(i).getReplicationSourceService();
+ replicationService.preLogRoll(null, emptyWalPaths.get(i));
+ replicationService.postLogRoll(null, emptyWalPaths.get(i));
+ }
+
+ // wait for ReplicationSource to start reading from our empty wal
+ waitForLogAdvance(numRs, emptyWalPaths, false);
+
+ // roll the original wal, which enqueues a new wal behind our empty wal
+ for (int i = 0; i < numRs; i++) {
+ HRegionInfo regionInfo =
+ utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
+ WAL wal = utility1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
+ wal.rollWriter(true);
+ }
+
+ // ReplicationSource should advance past the empty wal, or else the test will fail
+ waitForLogAdvance(numRs, emptyWalPaths, true);
+
+ // we're now writing to the new wal
+ // if everything works, the source should've stopped reading from the empty wal, and start
+ // replicating from the new wal
+ testSimplePutDelete();
+ }
+
+ /**
+ * Waits for the ReplicationSource to start reading from the given paths
+ * @param numRs number of regionservers
+ * @param emptyWalPaths path for each regionserver
+ * @param invert if true, waits until ReplicationSource is NOT reading from the given paths
+ */
+ private void waitForLogAdvance(final int numRs, final List<Path> emptyWalPaths,
+ final boolean invert) throws Exception {
+ Waiter.waitFor(conf1, 10000, new Waiter.Predicate<Exception>() {
+ @Override
+ public boolean evaluate() throws Exception {
+ for (int i = 0; i < numRs; i++) {
+ Replication replicationService = (Replication) utility1.getHBaseCluster()
+ .getRegionServer(i).getReplicationSourceService();
+ for (ReplicationSourceInterface rsi : replicationService.getReplicationManager()
+ .getSources()) {
+ ReplicationSource source = (ReplicationSource) rsi;
+ if (!invert && !emptyWalPaths.get(i).equals(source.getCurrentPath())) {
+ return false;
+ }
+ if (invert && emptyWalPaths.get(i).equals(source.getCurrentPath())) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+ });
+ }
}