You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/04/09 07:22:20 UTC
[13/20] hbase git commit: HBASE-20206 WALEntryStream should not
switch WAL file silently
HBASE-20206 WALEntryStream should not switch WAL file silently
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/16a4dd6b
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/16a4dd6b
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/16a4dd6b
Branch: refs/heads/branch-2
Commit: 16a4dd6b8f98cb1116007764cb86f6835a7ca84f
Parents: 644bfe3
Author: zhangduo <zh...@apache.org>
Authored: Sun Mar 18 18:09:45 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Mon Apr 9 15:18:44 2018 +0800
----------------------------------------------------------------------
.../replication/ReplicationQueueStorage.java | 2 +-
.../replication/ZKReplicationQueueStorage.java | 39 ++--
.../replication/TestReplicationStateBasic.java | 3 +-
.../TestZKReplicationQueueStorage.java | 6 +-
.../RecoveredReplicationSource.java | 33 ----
.../RecoveredReplicationSourceShipper.java | 13 +-
.../regionserver/ReplicationSource.java | 2 +-
.../regionserver/ReplicationSourceManager.java | 100 +++++-----
.../regionserver/ReplicationSourceShipper.java | 96 +++++-----
.../ReplicationSourceWALReader.java | 50 ++++-
.../SerialReplicationSourceWALReader.java | 29 ++-
.../replication/regionserver/WALEntryBatch.java | 22 +++
.../regionserver/WALEntryStream.java | 5 +-
.../TestReplicationSourceManager.java | 17 +-
.../regionserver/TestWALEntryStream.java | 188 ++++++++++++++-----
15 files changed, 384 insertions(+), 221 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/16a4dd6b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
index 4c93da6..cfe9c9c 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
@@ -63,7 +63,7 @@ public interface ReplicationQueueStorage {
* @param serverName the name of the regionserver
* @param queueId a String that identifies the queue
* @param fileName name of the WAL
- * @param position the current position in the file
+ * @param position the current position in the file. Will ignore if less than or equal to 0.
* @param lastSeqIds map with {encodedRegionName, sequenceId} pairs for serial replication.
*/
void setWALPosition(ServerName serverName, String queueId, String fileName, long position,
http://git-wip-us.apache.org/repos/asf/hbase/blob/16a4dd6b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
index adbf259..63f43e8 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
@@ -193,27 +193,28 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase
Map<String, Long> lastSeqIds) throws ReplicationException {
try {
List<ZKUtilOp> listOfOps = new ArrayList<>();
- listOfOps.add(ZKUtilOp.setData(getFileNode(serverName, queueId, fileName),
- ZKUtil.positionToByteArray(position)));
+ if (position > 0) {
+ listOfOps.add(ZKUtilOp.setData(getFileNode(serverName, queueId, fileName),
+ ZKUtil.positionToByteArray(position)));
+ }
// Persist the max sequence id(s) of regions for serial replication atomically.
- if (lastSeqIds != null && lastSeqIds.size() > 0) {
- for (Entry<String, Long> lastSeqEntry : lastSeqIds.entrySet()) {
- String peerId = new ReplicationQueueInfo(queueId).getPeerId();
- String path = getSerialReplicationRegionPeerNode(lastSeqEntry.getKey(), peerId);
- /*
- * Make sure the existence of path
- * /hbase/replication/regions/<hash>/<encoded-region-name>-<peer-id>. As the javadoc in
- * multiOrSequential() method said, if received a NodeExistsException, all operations will
- * fail. So create the path here, and in fact, no need to add this operation to listOfOps,
- * because only need to make sure that update file position and sequence id atomically.
- */
- ZKUtil.createWithParents(zookeeper, path);
- // Persist the max sequence id of region to zookeeper.
- listOfOps
- .add(ZKUtilOp.setData(path, ZKUtil.positionToByteArray(lastSeqEntry.getValue())));
- }
+ for (Entry<String, Long> lastSeqEntry : lastSeqIds.entrySet()) {
+ String peerId = new ReplicationQueueInfo(queueId).getPeerId();
+ String path = getSerialReplicationRegionPeerNode(lastSeqEntry.getKey(), peerId);
+ /*
+ * Make sure the existence of path
+ * /hbase/replication/regions/<hash>/<encoded-region-name>-<peer-id>. As the javadoc in
+ * multiOrSequential() method said, if received a NodeExistsException, all operations will
+ * fail. So create the path here, and in fact, no need to add this operation to listOfOps,
+ * because only need to make sure that update file position and sequence id atomically.
+ */
+ ZKUtil.createWithParents(zookeeper, path);
+ // Persist the max sequence id of region to zookeeper.
+ listOfOps.add(ZKUtilOp.setData(path, ZKUtil.positionToByteArray(lastSeqEntry.getValue())));
+ }
+ if (!listOfOps.isEmpty()) {
+ ZKUtil.multiOrSequential(zookeeper, listOfOps, false);
}
- ZKUtil.multiOrSequential(zookeeper, listOfOps, false);
} catch (KeeperException e) {
throw new ReplicationException("Failed to set log position (serverName=" + serverName
+ ", queueId=" + queueId + ", fileName=" + fileName + ", position=" + position + ")", e);
http://git-wip-us.apache.org/repos/asf/hbase/blob/16a4dd6b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
index 5999c1f..21b09aa 100644
--- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
+++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import org.apache.hadoop.fs.Path;
@@ -127,7 +128,7 @@ public abstract class TestReplicationStateBasic {
assertEquals(0, rqs.getWALsInQueue(server2, "qId1").size());
assertEquals(5, rqs.getWALsInQueue(server3, "qId5").size());
assertEquals(0, rqs.getWALPosition(server3, "qId1", "filename0"));
- rqs.setWALPosition(server3, "qId5", "filename4", 354L, null);
+ rqs.setWALPosition(server3, "qId5", "filename4", 354L, Collections.emptyMap());
assertEquals(354L, rqs.getWALPosition(server3, "qId5", "filename4"));
assertEquals(5, rqs.getWALsInQueue(server3, "qId5").size());
http://git-wip-us.apache.org/repos/asf/hbase/blob/16a4dd6b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java
index 8ff52f3..c813870 100644
--- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java
+++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.SortedSet;
@@ -136,9 +137,10 @@ public class TestZKReplicationQueueStorage {
for (int i = 0; i < 10; i++) {
assertEquals(0, STORAGE.getWALPosition(serverName1, queue1, getFileName("file1", i)));
assertEquals(0, STORAGE.getWALPosition(serverName1, queue2, getFileName("file2", i)));
- STORAGE.setWALPosition(serverName1, queue1, getFileName("file1", i), (i + 1) * 100, null);
+ STORAGE.setWALPosition(serverName1, queue1, getFileName("file1", i), (i + 1) * 100,
+ Collections.emptyMap());
STORAGE.setWALPosition(serverName1, queue2, getFileName("file2", i), (i + 1) * 100 + 10,
- null);
+ Collections.emptyMap());
}
for (int i = 0; i < 10; i++) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/16a4dd6b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
index 169b469..f1ad99d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import java.util.List;
import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -64,38 +63,6 @@ public class RecoveredReplicationSource extends ReplicationSource {
return new RecoveredReplicationSourceShipper(conf, walGroupId, queue, this, queueStorage);
}
- private void handleEmptyWALEntryBatch0(ReplicationSourceWALReader reader,
- BlockingQueue<WALEntryBatch> entryBatchQueue, Path currentPath) throws InterruptedException {
- LOG.trace("Didn't read any new entries from WAL");
- // we're done with queue recovery, shut ourself down
- reader.setReaderRunning(false);
- // shuts down shipper thread immediately
- entryBatchQueue.put(new WALEntryBatch(0, currentPath));
- }
-
- @Override
- protected ReplicationSourceWALReader createNewWALReader(String walGroupId,
- PriorityBlockingQueue<Path> queue, long startPosition) {
- if (replicationPeer.getPeerConfig().isSerial()) {
- return new SerialReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter,
- this) {
-
- @Override
- protected void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException {
- handleEmptyWALEntryBatch0(this, entryBatchQueue, currentPath);
- }
- };
- } else {
- return new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this) {
-
- @Override
- protected void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException {
- handleEmptyWALEntryBatch0(this, entryBatchQueue, currentPath);
- }
- };
- }
- }
-
public void locateRecoveredPaths(PriorityBlockingQueue<Path> queue) throws IOException {
boolean hasPathChanged = false;
PriorityBlockingQueue<Path> newPaths =
http://git-wip-us.apache.org/repos/asf/hbase/blob/16a4dd6b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
index 1ae5cb9..d74211e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
@@ -48,13 +48,10 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper
}
@Override
- protected void postShipEdits(WALEntryBatch entryBatch) {
- if (entryBatch.getWalEntries().isEmpty()) {
- LOG.debug("Finished recovering queue for group " + walGroupId + " of peer "
- + source.getQueueId());
- source.getSourceMetrics().incrCompletedRecoveryQueue();
- setWorkerState(WorkerState.FINISHED);
- }
+ protected void noMoreData() {
+ LOG.debug("Finished recovering queue for group {} of peer {}", walGroupId, source.getQueueId());
+ source.getSourceMetrics().incrCompletedRecoveryQueue();
+ setWorkerState(WorkerState.FINISHED);
}
@Override
@@ -63,7 +60,7 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper
}
@Override
- public long getStartPosition() {
+ long getStartPosition() {
long startPosition = getRecoveredQueueStartPos();
int numRetries = 0;
while (numRetries <= maxRetriesMultiplier) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/16a4dd6b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 3480919..236c575 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -315,7 +315,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
return new ReplicationSourceShipper(conf, walGroupId, queue, this);
}
- protected ReplicationSourceWALReader createNewWALReader(String walGroupId,
+ private ReplicationSourceWALReader createNewWALReader(String walGroupId,
PriorityBlockingQueue<Path> queue, long startPosition) {
return replicationPeer.getPeerConfig().isSerial()
? new SerialReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this)
http://git-wip-us.apache.org/repos/asf/hbase/blob/16a4dd6b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 06fe977..23e1115 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.NavigableSet;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
@@ -82,25 +83,28 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto
* operations.</li>
* <li>Need synchronized on {@link #walsById}. There are four methods which modify it,
* {@link #addPeer(String)}, {@link #removePeer(String)},
- * {@link #cleanOldLogs(SortedSet, String, String)} and {@link #preLogRoll(Path)}. {@link #walsById}
- * is a ConcurrentHashMap and there is a Lock for peer id in {@link PeerProcedureHandlerImpl}. So
- * there is no race between {@link #addPeer(String)} and {@link #removePeer(String)}.
- * {@link #cleanOldLogs(SortedSet, String, String)} is called by {@link ReplicationSourceInterface}.
- * So no race with {@link #addPeer(String)}. {@link #removePeer(String)} will terminate the
- * {@link ReplicationSourceInterface} firstly, then remove the wals from {@link #walsById}. So no
- * race with {@link #removePeer(String)}. The only case need synchronized is
- * {@link #cleanOldLogs(SortedSet, String, String)} and {@link #preLogRoll(Path)}.</li>
+ * {@link #cleanOldLogs(NavigableSet, String, boolean, String)} and {@link #preLogRoll(Path)}.
+ * {@link #walsById} is a ConcurrentHashMap and there is a Lock for peer id in
+ * {@link PeerProcedureHandlerImpl}. So there is no race between {@link #addPeer(String)} and
+ * {@link #removePeer(String)}. {@link #cleanOldLogs(NavigableSet, String, boolean, String)} is
+ * called by {@link ReplicationSourceInterface}. So no race with {@link #addPeer(String)}.
+ * {@link #removePeer(String)} will terminate the {@link ReplicationSourceInterface} firstly, then
+ * remove the wals from {@link #walsById}. So no race with {@link #removePeer(String)}. The only
+ * case need synchronized is {@link #cleanOldLogs(NavigableSet, String, boolean, String)} and
+ * {@link #preLogRoll(Path)}.</li>
* <li>No need synchronized on {@link #walsByIdRecoveredQueues}. There are three methods which
- * modify it, {@link #removePeer(String)} , {@link #cleanOldLogs(SortedSet, String, String)} and
+ * modify it, {@link #removePeer(String)} ,
+ * {@link #cleanOldLogs(NavigableSet, String, boolean, String)} and
* {@link ReplicationSourceManager.NodeFailoverWorker#run()}.
- * {@link #cleanOldLogs(SortedSet, String, String)} is called by {@link ReplicationSourceInterface}.
- * {@link #removePeer(String)} will terminate the {@link ReplicationSourceInterface} firstly, then
- * remove the wals from {@link #walsByIdRecoveredQueues}. And
- * {@link ReplicationSourceManager.NodeFailoverWorker#run()} will add the wals to
- * {@link #walsByIdRecoveredQueues} firstly, then start up a {@link ReplicationSourceInterface}. So
- * there is no race here. For {@link ReplicationSourceManager.NodeFailoverWorker#run()} and
- * {@link #removePeer(String)}, there is already synchronized on {@link #oldsources}. So no need
- * synchronized on {@link #walsByIdRecoveredQueues}.</li>
+ * {@link #cleanOldLogs(NavigableSet, String, boolean, String)} is called by
+ * {@link ReplicationSourceInterface}. {@link #removePeer(String)} will terminate the
+ * {@link ReplicationSourceInterface} firstly, then remove the wals from
+ * {@link #walsByIdRecoveredQueues}. And {@link ReplicationSourceManager.NodeFailoverWorker#run()}
+ * will add the wals to {@link #walsByIdRecoveredQueues} firstly, then start up a
+ * {@link ReplicationSourceInterface}. So there is no race here. For
+ * {@link ReplicationSourceManager.NodeFailoverWorker#run()} and {@link #removePeer(String)}, there
+ * is already synchronized on {@link #oldsources}. So no need synchronized on
+ * {@link #walsByIdRecoveredQueues}.</li>
* <li>Need synchronized on {@link #latestPaths} to avoid the new open source miss new log.</li>
* <li>Need synchronized on {@link #oldsources} to avoid adding recovered source for the
* to-be-removed peer.</li>
@@ -124,11 +128,11 @@ public class ReplicationSourceManager implements ReplicationListener {
// All logs we are currently tracking
// Index structure of the map is: queue_id->logPrefix/logGroup->logs
// For normal replication source, the peer id is same with the queue id
- private final ConcurrentMap<String, Map<String, SortedSet<String>>> walsById;
+ private final ConcurrentMap<String, Map<String, NavigableSet<String>>> walsById;
// Logs for recovered sources we are currently tracking
// the map is: queue_id->logPrefix/logGroup->logs
// For recovered source, the queue id's format is peer_id-servername-*
- private final ConcurrentMap<String, Map<String, SortedSet<String>>> walsByIdRecoveredQueues;
+ private final ConcurrentMap<String, Map<String, NavigableSet<String>>> walsByIdRecoveredQueues;
private final Configuration conf;
private final FileSystem fs;
@@ -335,14 +339,14 @@ public class ReplicationSourceManager implements ReplicationListener {
// synchronized on latestPaths to avoid missing the new log
synchronized (this.latestPaths) {
this.sources.put(peerId, src);
- Map<String, SortedSet<String>> walsByGroup = new HashMap<>();
+ Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
this.walsById.put(peerId, walsByGroup);
// Add the latest wal to that source's queue
if (this.latestPaths.size() > 0) {
for (Path logPath : latestPaths) {
String name = logPath.getName();
String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(name);
- SortedSet<String> logs = new TreeSet<>();
+ NavigableSet<String> logs = new TreeSet<>();
logs.add(name);
walsByGroup.put(walPrefix, logs);
// Abort RS and throw exception to make add peer failed
@@ -474,50 +478,51 @@ public class ReplicationSourceManager implements ReplicationListener {
/**
* This method will log the current position to storage. And also clean old logs from the
* replication queue.
- * @param log Path to the log currently being replicated
* @param queueId id of the replication queue
- * @param position current location in the log
* @param queueRecovered indicates if this queue comes from another region server
+ * @param entryBatch the wal entry batch we just shipped
*/
- public void logPositionAndCleanOldLogs(Path log, String queueId, long position,
- Map<String, Long> lastSeqIds, boolean queueRecovered) {
- String fileName = log.getName();
+ public void logPositionAndCleanOldLogs(String queueId, boolean queueRecovered,
+ WALEntryBatch entryBatch) {
+ String fileName = entryBatch.getLastWalPath().getName();
abortWhenFail(() -> this.queueStorage.setWALPosition(server.getServerName(), queueId, fileName,
- position, lastSeqIds));
- cleanOldLogs(fileName, queueId, queueRecovered);
+ entryBatch.getLastWalPosition(), entryBatch.getLastSeqIds()));
+ cleanOldLogs(fileName, entryBatch.isEndOfFile(), queueId, queueRecovered);
}
/**
* Cleans a log file and all older logs from replication queue. Called when we are sure that a log
* file is closed and has no more entries.
* @param log Path to the log
+ * @param inclusive whether we should also remove the given log file
* @param queueId id of the replication queue
* @param queueRecovered Whether this is a recovered queue
*/
@VisibleForTesting
- void cleanOldLogs(String log, String queueId, boolean queueRecovered) {
+ void cleanOldLogs(String log, boolean inclusive, String queueId, boolean queueRecovered) {
String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log);
if (queueRecovered) {
- SortedSet<String> wals = walsByIdRecoveredQueues.get(queueId).get(logPrefix);
- if (wals != null && !wals.first().equals(log)) {
- cleanOldLogs(wals, log, queueId);
+ NavigableSet<String> wals = walsByIdRecoveredQueues.get(queueId).get(logPrefix);
+ if (wals != null) {
+ cleanOldLogs(wals, log, inclusive, queueId);
}
} else {
// synchronized on walsById to avoid race with preLogRoll
synchronized (this.walsById) {
- SortedSet<String> wals = walsById.get(queueId).get(logPrefix);
+ NavigableSet<String> wals = walsById.get(queueId).get(logPrefix);
if (wals != null && !wals.first().equals(log)) {
- cleanOldLogs(wals, log, queueId);
+ cleanOldLogs(wals, log, inclusive, queueId);
}
}
}
}
- private void cleanOldLogs(SortedSet<String> wals, String key, String id) {
- SortedSet<String> walSet = wals.headSet(key);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Removing " + walSet.size() + " logs in the list: " + walSet);
+ private void cleanOldLogs(NavigableSet<String> wals, String key, boolean inclusive, String id) {
+ NavigableSet<String> walSet = wals.headSet(key, inclusive);
+ if (walSet.isEmpty()) {
+ return;
}
+ LOG.debug("Removing {} logs in the list: {}", walSet.size(), walSet);
for (String wal : walSet) {
abortWhenFail(() -> this.queueStorage.removeWAL(server.getServerName(), id, wal));
}
@@ -542,11 +547,12 @@ public class ReplicationSourceManager implements ReplicationListener {
// synchronized on walsById to avoid race with cleanOldLogs
synchronized (this.walsById) {
// Update walsById map
- for (Map.Entry<String, Map<String, SortedSet<String>>> entry : this.walsById.entrySet()) {
+ for (Map.Entry<String, Map<String, NavigableSet<String>>> entry : this.walsById
+ .entrySet()) {
String peerId = entry.getKey();
- Map<String, SortedSet<String>> walsByPrefix = entry.getValue();
+ Map<String, NavigableSet<String>> walsByPrefix = entry.getValue();
boolean existingPrefix = false;
- for (Map.Entry<String, SortedSet<String>> walsEntry : walsByPrefix.entrySet()) {
+ for (Map.Entry<String, NavigableSet<String>> walsEntry : walsByPrefix.entrySet()) {
SortedSet<String> wals = walsEntry.getValue();
if (this.sources.isEmpty()) {
// If there's no slaves, don't need to keep the old wals since
@@ -560,8 +566,8 @@ public class ReplicationSourceManager implements ReplicationListener {
}
if (!existingPrefix) {
// The new log belongs to a new group, add it into this peer
- LOG.debug("Start tracking logs for wal group " + logPrefix + " for peer " + peerId);
- SortedSet<String> wals = new TreeSet<>();
+ LOG.debug("Start tracking logs for wal group {} for peer {}", logPrefix, peerId);
+ NavigableSet<String> wals = new TreeSet<>();
wals.add(logName);
walsByPrefix.put(logPrefix, wals);
}
@@ -700,11 +706,11 @@ public class ReplicationSourceManager implements ReplicationListener {
continue;
}
// track sources in walsByIdRecoveredQueues
- Map<String, SortedSet<String>> walsByGroup = new HashMap<>();
+ Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
walsByIdRecoveredQueues.put(queueId, walsByGroup);
for (String wal : walsSet) {
String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
- SortedSet<String> wals = walsByGroup.get(walPrefix);
+ NavigableSet<String> wals = walsByGroup.get(walPrefix);
if (wals == null) {
wals = new TreeSet<>();
walsByGroup.put(walPrefix, wals);
@@ -749,7 +755,7 @@ public class ReplicationSourceManager implements ReplicationListener {
* @return a sorted set of wal names
*/
@VisibleForTesting
- Map<String, Map<String, SortedSet<String>>> getWALs() {
+ Map<String, Map<String, NavigableSet<String>>> getWALs() {
return Collections.unmodifiableMap(walsById);
}
@@ -758,7 +764,7 @@ public class ReplicationSourceManager implements ReplicationListener {
* @return a sorted set of wal names
*/
@VisibleForTesting
- Map<String, Map<String, SortedSet<String>>> getWalsByIdRecoveredQueues() {
+ Map<String, Map<String, NavigableSet<String>>> getWalsByIdRecoveredQueues() {
return Collections.unmodifiableMap(walsByIdRecoveredQueues);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/16a4dd6b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
index aa5251e..2097d00 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.PriorityBlockingQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -52,17 +51,18 @@ public class ReplicationSourceShipper extends Thread {
FINISHED, // The worker is done processing a recovered queue
}
- protected final Configuration conf;
+ private final Configuration conf;
protected final String walGroupId;
protected final PriorityBlockingQueue<Path> queue;
- protected final ReplicationSourceInterface source;
+ private final ReplicationSourceInterface source;
// Last position in the log that we sent to ZooKeeper
- protected long lastLoggedPosition = -1;
+ // It will be accessed by the stats thread so make it volatile
+ private volatile long currentPosition = -1;
// Path of the current log
- protected volatile Path currentPath;
+ private Path currentPath;
// Current state of the worker thread
- private WorkerState state;
+ private volatile WorkerState state;
protected ReplicationSourceWALReader entryReader;
// How long should we sleep for each retry
@@ -97,8 +97,12 @@ public class ReplicationSourceShipper extends Thread {
}
try {
WALEntryBatch entryBatch = entryReader.take();
- shipEdits(entryBatch);
- postShipEdits(entryBatch);
+ // the NO_MORE_DATA instance has no path so do not all shipEdits
+ if (entryBatch == WALEntryBatch.NO_MORE_DATA) {
+ noMoreData();
+ } else {
+ shipEdits(entryBatch);
+ }
} catch (InterruptedException e) {
LOG.trace("Interrupted while waiting for next replication entry batch", e);
Thread.currentThread().interrupt();
@@ -113,7 +117,7 @@ public class ReplicationSourceShipper extends Thread {
}
// To be implemented by recovered shipper
- protected void postShipEdits(WALEntryBatch entryBatch) {
+ protected void noMoreData() {
}
// To be implemented by recovered shipper
@@ -123,14 +127,11 @@ public class ReplicationSourceShipper extends Thread {
/**
* Do the shipping logic
*/
- protected final void shipEdits(WALEntryBatch entryBatch) {
+ private void shipEdits(WALEntryBatch entryBatch) {
List<Entry> entries = entryBatch.getWalEntries();
- long lastReadPosition = entryBatch.getLastWalPosition();
- currentPath = entryBatch.getLastWalPath();
int sleepMultiplier = 0;
if (entries.isEmpty()) {
- if (lastLoggedPosition != lastReadPosition) {
- updateLogPosition(lastReadPosition, entryBatch.getLastSeqIds());
+ if (updateLogPosition(entryBatch)) {
// if there was nothing to ship and it's not an error
// set "ageOfLastShippedOp" to <now> to indicate that we're current
source.getSourceMetrics().setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(),
@@ -168,16 +169,12 @@ public class ReplicationSourceShipper extends Thread {
} else {
sleepMultiplier = Math.max(sleepMultiplier - 1, 0);
}
-
- if (this.lastLoggedPosition != lastReadPosition) {
- // Clean up hfile references
- int size = entries.size();
- for (int i = 0; i < size; i++) {
- cleanUpHFileRefs(entries.get(i).getEdit());
- }
- // Log and clean up WAL logs
- updateLogPosition(lastReadPosition, entryBatch.getLastSeqIds());
+ // Clean up hfile references
+ for (Entry entry : entries) {
+ cleanUpHFileRefs(entry.getEdit());
}
+ // Log and clean up WAL logs
+ updateLogPosition(entryBatch);
source.postShipEdits(entries, currentSize);
// FIXME check relationship between wal group and overall
@@ -224,10 +221,29 @@ public class ReplicationSourceShipper extends Thread {
}
}
- private void updateLogPosition(long lastReadPosition, Map<String, Long> lastSeqIds) {
- source.getSourceManager().logPositionAndCleanOldLogs(currentPath, source.getQueueId(),
- lastReadPosition, lastSeqIds, source.isRecovered());
- lastLoggedPosition = lastReadPosition;
+ private boolean updateLogPosition(WALEntryBatch batch) {
+ boolean updated = false;
+ // if end of file is true, then the logPositionAndCleanOldLogs method will remove the file
+ // record on zk, so let's call it. The last wal position maybe zero if end of file is true and
+ // there is no entry in the batch. It is OK because that the queue storage will ignore the zero
+ // position and the file will be removed soon in cleanOldLogs.
+ if (batch.isEndOfFile() || !batch.getLastWalPath().equals(currentPath) ||
+ batch.getLastWalPosition() != currentPosition) {
+ source.getSourceManager().logPositionAndCleanOldLogs(source.getQueueId(),
+ source.isRecovered(), batch);
+ updated = true;
+ }
+ // if end of file is true, then we can just skip to the next file in queue.
+ // the only exception is for recovered queue, if we reach the end of the queue, then there will
+ // no more files so here the currentPath may be null.
+ if (batch.isEndOfFile()) {
+ currentPath = entryReader.getCurrentPath();
+ currentPosition = 0L;
+ } else {
+ currentPath = batch.getLastWalPath();
+ currentPosition = batch.getLastWalPosition();
+ }
+ return updated;
}
public void startup(UncaughtExceptionHandler handler) {
@@ -236,39 +252,31 @@ public class ReplicationSourceShipper extends Thread {
name + ".replicationSource.shipper" + walGroupId + "," + source.getQueueId(), handler);
}
- public PriorityBlockingQueue<Path> getLogQueue() {
- return this.queue;
- }
-
- public Path getCurrentPath() {
- return this.entryReader.getCurrentPath();
+ Path getCurrentPath() {
+ return entryReader.getCurrentPath();
}
- public long getCurrentPosition() {
- return this.lastLoggedPosition;
+ long getCurrentPosition() {
+ return currentPosition;
}
- public void setWALReader(ReplicationSourceWALReader entryReader) {
+ void setWALReader(ReplicationSourceWALReader entryReader) {
this.entryReader = entryReader;
}
- public long getStartPosition() {
+ long getStartPosition() {
return 0;
}
- protected final boolean isActive() {
+ private boolean isActive() {
return source.isSourceActive() && state == WorkerState.RUNNING && !isInterrupted();
}
- public void setWorkerState(WorkerState state) {
+ protected final void setWorkerState(WorkerState state) {
this.state = state;
}
- public WorkerState getWorkerState() {
- return state;
- }
-
- public void stopWorker() {
+ void stopWorker() {
setWorkerState(WorkerState.STOPPED);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/16a4dd6b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index b125133..2154856 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -59,7 +59,7 @@ class ReplicationSourceWALReader extends Thread {
private final WALEntryFilter filter;
private final ReplicationSource source;
- protected final BlockingQueue<WALEntryBatch> entryBatchQueue;
+ private final BlockingQueue<WALEntryBatch> entryBatchQueue;
// max (heap) size of each batch - multiply by number of batches in queue to get total
private final long replicationBatchSizeCapacity;
// max count of each batch - multiply by number of batches in queue to get total
@@ -130,6 +130,7 @@ class ReplicationSourceWALReader extends Thread {
continue;
}
WALEntryBatch batch = readWALEntries(entryStream);
+ currentPosition = entryStream.getPosition();
if (batch != null) {
// need to propagate the batch even it has no entries since it may carry the last
// sequence id information for serial replication.
@@ -138,9 +139,8 @@ class ReplicationSourceWALReader extends Thread {
sleepMultiplier = 1;
} else { // got no entries and didn't advance position in WAL
handleEmptyWALEntryBatch(entryStream.getCurrentPath());
+ entryStream.reset(); // reuse stream
}
- currentPosition = entryStream.getPosition();
- entryStream.reset(); // reuse stream
}
} catch (IOException e) { // stream related
if (sleepMultiplier < maxRetriesMultiplier) {
@@ -173,13 +173,31 @@ class ReplicationSourceWALReader extends Thread {
batch.getNbEntries() >= replicationBatchCountCapacity;
}
+ protected static final boolean switched(WALEntryStream entryStream, Path path) {
+ return !path.equals(entryStream.getCurrentPath());
+ }
+
protected WALEntryBatch readWALEntries(WALEntryStream entryStream)
throws IOException, InterruptedException {
+ Path currentPath = entryStream.getCurrentPath();
if (!entryStream.hasNext()) {
- return null;
+ // check whether we have switched a file
+ if (currentPath != null && switched(entryStream, currentPath)) {
+ return WALEntryBatch.endOfFile(currentPath);
+ } else {
+ return null;
+ }
+ }
+ if (currentPath != null) {
+ if (switched(entryStream, currentPath)) {
+ return WALEntryBatch.endOfFile(currentPath);
+ }
+ } else {
+ // when reading from the entry stream first time we will enter here
+ currentPath = entryStream.getCurrentPath();
}
WALEntryBatch batch = createBatch(entryStream);
- do {
+ for (;;) {
Entry entry = entryStream.next();
batch.setLastWalPosition(entryStream.getPosition());
entry = filterEntry(entry);
@@ -188,13 +206,29 @@ class ReplicationSourceWALReader extends Thread {
break;
}
}
- } while (entryStream.hasNext());
+ boolean hasNext = entryStream.hasNext();
+ // always return if we have switched to a new file
+ if (switched(entryStream, currentPath)) {
+ batch.setEndOfFile(true);
+ break;
+ }
+ if (!hasNext) {
+ break;
+ }
+ }
return batch;
}
- protected void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException {
+ private void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException {
LOG.trace("Didn't read any new entries from WAL");
- Thread.sleep(sleepForRetries);
+ if (source.isRecovered()) {
+ // we're done with queue recovery, shut ourself down
+ setReaderRunning(false);
+ // shuts down shipper thread immediately
+ entryBatchQueue.put(WALEntryBatch.NO_MORE_DATA);
+ } else {
+ Thread.sleep(sleepForRetries);
+ }
}
// if we get an EOF due to a zero-length log, and there are other logs in queue
http://git-wip-us.apache.org/repos/asf/hbase/blob/16a4dd6b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
index 5e9a9f6..9edcc8a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
@@ -53,12 +53,26 @@ public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader
@Override
protected WALEntryBatch readWALEntries(WALEntryStream entryStream)
throws IOException, InterruptedException {
+ Path currentPath = entryStream.getCurrentPath();
if (!entryStream.hasNext()) {
- return null;
+ // check whether we have switched a file
+ if (currentPath != null && switched(entryStream, currentPath)) {
+ return WALEntryBatch.endOfFile(currentPath);
+ } else {
+ return null;
+ }
+ }
+ if (currentPath != null) {
+ if (switched(entryStream, currentPath)) {
+ return WALEntryBatch.endOfFile(currentPath);
+ }
+ } else {
+ // when reading from the entry stream first time we will enter here
+ currentPath = entryStream.getCurrentPath();
}
long positionBefore = entryStream.getPosition();
WALEntryBatch batch = createBatch(entryStream);
- do {
+ for (;;) {
Entry entry = entryStream.peek();
boolean doFiltering = true;
if (firstCellInEntryBeforeFiltering == null) {
@@ -99,7 +113,16 @@ public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader
// actually remove the entry.
removeEntryFromStream(entryStream, batch);
}
- } while (entryStream.hasNext());
+ boolean hasNext = entryStream.hasNext();
+ // always return if we have switched to a new file.
+ if (switched(entryStream, currentPath)) {
+ batch.setEndOfFile(true);
+ break;
+ }
+ if (!hasNext) {
+ break;
+ }
+ }
return batch;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/16a4dd6b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
index 31c3ac7..960d473 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
@@ -30,6 +30,10 @@ import org.apache.yetus.audience.InterfaceAudience;
*/
@InterfaceAudience.Private
class WALEntryBatch {
+
+ // used by recovered replication queue to indicate that all the entries have been read.
+ public static final WALEntryBatch NO_MORE_DATA = new WALEntryBatch(0, null);
+
private List<Entry> walEntries;
// last WAL that was read
private Path lastWalPath;
@@ -43,6 +47,8 @@ class WALEntryBatch {
private long heapSize = 0;
// save the last sequenceid for each region if the table has serial-replication scope
private Map<String, Long> lastSeqIds = new HashMap<>();
+ // indicate that this is the end of the current file
+ private boolean endOfFile;
/**
* @param lastWalPath Path of the WAL the last entry in this batch was read from
@@ -52,6 +58,14 @@ class WALEntryBatch {
this.lastWalPath = lastWalPath;
}
+
+ static WALEntryBatch endOfFile(Path lastWalPath) {
+ WALEntryBatch batch = new WALEntryBatch(0, lastWalPath);
+ batch.setLastWalPosition(-1L);
+ batch.setEndOfFile(true);
+ return batch;
+ }
+
public void addEntry(Entry entry) {
walEntries.add(entry);
}
@@ -120,6 +134,14 @@ class WALEntryBatch {
return lastSeqIds;
}
+ public boolean isEndOfFile() {
+ return endOfFile;
+ }
+
+ public void setEndOfFile(boolean endOfFile) {
+ this.endOfFile = endOfFile;
+ }
+
public void incrementNbRowKeys(int increment) {
nbRowKeys += increment;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/16a4dd6b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
index c639a48..b2c199e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
@@ -155,7 +155,6 @@ class WALEntryStream implements Closeable {
/**
* Should be called if the stream is to be reused (i.e. used again after hasNext() has returned
* false)
- * @throws IOException
*/
public void reset() throws IOException {
if (reader != null && currentPath != null) {
@@ -304,6 +303,9 @@ class WALEntryStream implements Closeable {
if (reader != null) {
return true;
}
+ } else {
+ // no more files in queue, this could only happen for recovered queue.
+ setCurrentPath(null);
}
return false;
}
@@ -394,6 +396,7 @@ class WALEntryStream implements Closeable {
private void resetReader() throws IOException {
try {
+ currentEntry = null;
reader.reset();
seek();
} catch (FileNotFoundException fnfe) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/16a4dd6b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 6d75fec..eb46cd7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -32,6 +32,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
+import java.util.NavigableSet;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeMap;
@@ -308,25 +309,25 @@ public abstract class TestReplicationSourceManager {
for (int i = 0; i < 3; i++) {
wal.append(hri,
new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes),
- edit, true);
+ edit, true);
}
wal.sync();
int logNumber = 0;
- for (Map.Entry<String, SortedSet<String>> entry : manager.getWALs().get(slaveId).entrySet()) {
+ for (Map.Entry<String, NavigableSet<String>> entry : manager.getWALs().get(slaveId)
+ .entrySet()) {
logNumber += entry.getValue().size();
}
assertEquals(6, logNumber);
wal.rollWriter();
- manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(),
- "1", 0, null, false);
+ manager.logPositionAndCleanOldLogs("1", false,
+ new WALEntryBatch(0, manager.getSources().get(0).getCurrentPath()));
wal.append(hri,
- new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes),
- edit,
- true);
+ new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes),
+ edit, true);
wal.sync();
assertEquals(1, manager.getWALs().size());
@@ -396,7 +397,7 @@ public abstract class TestReplicationSourceManager {
assertEquals(1, manager.getWalsByIdRecoveredQueues().size());
String id = "1-" + server.getServerName().getServerName();
assertEquals(files, manager.getWalsByIdRecoveredQueues().get(id).get(group));
- manager.cleanOldLogs(file2, id, true);
+ manager.cleanOldLogs(file2, false, id, true);
// log1 should be deleted
assertEquals(Sets.newHashSet(file2), manager.getWalsByIdRecoveredQueues().get(id).get(group));
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/16a4dd6b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
index eb7d5a0..2670756 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
@@ -75,7 +76,7 @@ public class TestWALEntryStream {
HBaseClassTestRule.forClass(TestWALEntryStream.class);
private static HBaseTestingUtility TEST_UTIL;
- private static Configuration conf;
+ private static Configuration CONF;
private static FileSystem fs;
private static MiniDFSCluster cluster;
private static final TableName tableName = TableName.valueOf("tablename");
@@ -102,7 +103,7 @@ public class TestWALEntryStream {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL = new HBaseTestingUtility();
- conf = TEST_UTIL.getConfiguration();
+ CONF = TEST_UTIL.getConfiguration();
TEST_UTIL.startMiniDFSCluster(3);
cluster = TEST_UTIL.getDFSCluster();
@@ -118,7 +119,7 @@ public class TestWALEntryStream {
public void setUp() throws Exception {
walQueue = new PriorityBlockingQueue<>();
pathWatcher = new PathWatcher();
- final WALFactory wals = new WALFactory(conf, tn.getMethodName());
+ final WALFactory wals = new WALFactory(CONF, tn.getMethodName());
wals.getWALProvider().addWALActionsListener(pathWatcher);
log = wals.getWAL(info);
}
@@ -144,13 +145,13 @@ public class TestWALEntryStream {
mvcc.advanceTo(1);
for (int i = 0; i < nbRows; i++) {
- appendToLogPlus(walEditKVs);
+ appendToLogAndSync(walEditKVs);
}
log.rollWriter();
try (WALEntryStream entryStream =
- new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) {
+ new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) {
int i = 0;
while (entryStream.hasNext()) {
assertNotNull(entryStream.next());
@@ -174,10 +175,10 @@ public class TestWALEntryStream {
*/
@Test
public void testAppendsWithRolls() throws Exception {
- appendToLog();
+ appendToLogAndSync();
long oldPos;
try (WALEntryStream entryStream =
- new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) {
+ new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) {
// There's one edit in the log, read it. Reading past it needs to throw exception
assertTrue(entryStream.hasNext());
WAL.Entry entry = entryStream.peek();
@@ -189,9 +190,9 @@ public class TestWALEntryStream {
oldPos = entryStream.getPosition();
}
- appendToLog();
+ appendToLogAndSync();
- try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, conf, oldPos,
+ try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, CONF, oldPos,
log, null, new MetricsSource("1"))) {
// Read the newly added entry, make sure we made progress
WAL.Entry entry = entryStream.next();
@@ -201,11 +202,11 @@ public class TestWALEntryStream {
}
// We rolled but we still should see the end of the first log and get that item
- appendToLog();
+ appendToLogAndSync();
log.rollWriter();
- appendToLog();
+ appendToLogAndSync();
- try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, conf, oldPos,
+ try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, CONF, oldPos,
log, null, new MetricsSource("1"))) {
WAL.Entry entry = entryStream.next();
assertNotEquals(oldPos, entryStream.getPosition());
@@ -231,7 +232,7 @@ public class TestWALEntryStream {
appendToLog("1");
appendToLog("2");// 2
try (WALEntryStream entryStream =
- new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) {
+ new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) {
assertEquals("1", getRow(entryStream.next()));
appendToLog("3"); // 3 - comes in after reader opened
@@ -256,7 +257,7 @@ public class TestWALEntryStream {
public void testNewEntriesWhileStreaming() throws Exception {
appendToLog("1");
try (WALEntryStream entryStream =
- new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) {
+ new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) {
entryStream.next(); // we've hit the end of the stream at this point
// some new entries come in while we're streaming
@@ -279,7 +280,7 @@ public class TestWALEntryStream {
long lastPosition = 0;
appendToLog("1");
try (WALEntryStream entryStream =
- new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) {
+ new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) {
entryStream.next(); // we've hit the end of the stream at this point
appendToLog("2");
appendToLog("3");
@@ -287,7 +288,7 @@ public class TestWALEntryStream {
}
// next stream should picks up where we left off
try (WALEntryStream entryStream =
- new WALEntryStream(walQueue, fs, conf, lastPosition, log, null, new MetricsSource("1"))) {
+ new WALEntryStream(walQueue, fs, CONF, lastPosition, log, null, new MetricsSource("1"))) {
assertEquals("2", getRow(entryStream.next()));
assertEquals("3", getRow(entryStream.next()));
assertFalse(entryStream.hasNext()); // done
@@ -302,16 +303,16 @@ public class TestWALEntryStream {
@Test
public void testPosition() throws Exception {
long lastPosition = 0;
- appendEntriesToLog(3);
+ appendEntriesToLogAndSync(3);
// read only one element
- try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, conf, lastPosition,
+ try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, CONF, lastPosition,
log, null, new MetricsSource("1"))) {
entryStream.next();
lastPosition = entryStream.getPosition();
}
// there should still be two more entries from where we left off
try (WALEntryStream entryStream =
- new WALEntryStream(walQueue, fs, conf, lastPosition, log, null, new MetricsSource("1"))) {
+ new WALEntryStream(walQueue, fs, CONF, lastPosition, log, null, new MetricsSource("1"))) {
assertNotNull(entryStream.next());
assertNotNull(entryStream.next());
assertFalse(entryStream.hasNext());
@@ -322,38 +323,44 @@ public class TestWALEntryStream {
@Test
public void testEmptyStream() throws Exception {
try (WALEntryStream entryStream =
- new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) {
+ new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) {
assertFalse(entryStream.hasNext());
}
}
+ private ReplicationSourceWALReader createReader(boolean recovered, Configuration conf) {
+ ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
+ when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
+ Server mockServer = Mockito.mock(Server.class);
+ ReplicationSource source = Mockito.mock(ReplicationSource.class);
+ when(source.getSourceManager()).thenReturn(mockSourceManager);
+ when(source.getSourceMetrics()).thenReturn(new MetricsSource("1"));
+ when(source.getWALFileLengthProvider()).thenReturn(log);
+ when(source.getServer()).thenReturn(mockServer);
+ when(source.isRecovered()).thenReturn(recovered);
+ ReplicationSourceWALReader reader =
+ new ReplicationSourceWALReader(fs, conf, walQueue, 0, getDummyFilter(), source);
+ reader.start();
+ return reader;
+ }
+
@Test
- public void testReplicationSourceWALReaderThread() throws Exception {
- appendEntriesToLog(3);
+ public void testReplicationSourceWALReader() throws Exception {
+ appendEntriesToLogAndSync(3);
// get ending position
long position;
try (WALEntryStream entryStream =
- new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) {
+ new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) {
entryStream.next();
entryStream.next();
entryStream.next();
position = entryStream.getPosition();
}
- // start up a batcher
- ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
- when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
- Server mockServer= Mockito.mock(Server.class);
- ReplicationSource source = Mockito.mock(ReplicationSource.class);
- when(source.getSourceManager()).thenReturn(mockSourceManager);
- when(source.getSourceMetrics()).thenReturn(new MetricsSource("1"));
- when(source.getWALFileLengthProvider()).thenReturn(log);
- when(source.getServer()).thenReturn(mockServer);
- ReplicationSourceWALReader batcher = new ReplicationSourceWALReader(fs, conf,
- walQueue, 0, getDummyFilter(), source);
+ // start up a reader
Path walPath = walQueue.peek();
- batcher.start();
- WALEntryBatch entryBatch = batcher.take();
+ ReplicationSourceWALReader reader = createReader(false, CONF);
+ WALEntryBatch entryBatch = reader.take();
// should've batched up our entries
assertNotNull(entryBatch);
@@ -363,11 +370,96 @@ public class TestWALEntryStream {
assertEquals(3, entryBatch.getNbRowKeys());
appendToLog("foo");
- entryBatch = batcher.take();
+ entryBatch = reader.take();
assertEquals(1, entryBatch.getNbEntries());
assertEquals("foo", getRow(entryBatch.getWalEntries().get(0)));
}
+ @Test
+ public void testReplicationSourceWALReaderRecovered() throws Exception {
+ appendEntriesToLogAndSync(10);
+ Path walPath = walQueue.peek();
+ log.rollWriter();
+ appendEntriesToLogAndSync(5);
+ log.shutdown();
+
+ Configuration conf = new Configuration(CONF);
+ conf.setInt("replication.source.nb.capacity", 10);
+
+ ReplicationSourceWALReader reader = createReader(true, conf);
+
+ WALEntryBatch batch = reader.take();
+ assertEquals(walPath, batch.getLastWalPath());
+ assertEquals(10, batch.getNbEntries());
+ assertFalse(batch.isEndOfFile());
+
+ batch = reader.take();
+ assertEquals(walPath, batch.getLastWalPath());
+ assertEquals(0, batch.getNbEntries());
+ assertTrue(batch.isEndOfFile());
+
+ walPath = walQueue.peek();
+ batch = reader.take();
+ assertEquals(walPath, batch.getLastWalPath());
+ assertEquals(5, batch.getNbEntries());
+ // Actually this should be true but we haven't handled this yet since for a normal queue the
+ // last one is always open... Not a big deal for now.
+ assertFalse(batch.isEndOfFile());
+
+ assertSame(WALEntryBatch.NO_MORE_DATA, reader.take());
+ }
+
+ // Testcase for HBASE-20206
+ @Test
+ public void testReplicationSourceWALReaderWrongPosition() throws Exception {
+ appendEntriesToLogAndSync(1);
+ Path walPath = walQueue.peek();
+ log.rollWriter();
+ appendEntriesToLogAndSync(20);
+ TEST_UTIL.waitFor(5000, new ExplainingPredicate<Exception>() {
+
+ @Override
+ public boolean evaluate() throws Exception {
+ return fs.getFileStatus(walPath).getLen() > 0;
+ }
+
+ @Override
+ public String explainFailure() throws Exception {
+ return walPath + " has not been closed yet";
+ }
+
+ });
+ long walLength = fs.getFileStatus(walPath).getLen();
+
+ ReplicationSourceWALReader reader = createReader(false, CONF);
+
+ WALEntryBatch entryBatch = reader.take();
+ assertEquals(walPath, entryBatch.getLastWalPath());
+ assertTrue("Position " + entryBatch.getLastWalPosition() + " is out of range, file length is " +
+ walLength, entryBatch.getLastWalPosition() <= walLength);
+ assertEquals(1, entryBatch.getNbEntries());
+ assertTrue(entryBatch.isEndOfFile());
+
+ Path walPath2 = walQueue.peek();
+ entryBatch = reader.take();
+ assertEquals(walPath2, entryBatch.getLastWalPath());
+ assertEquals(20, entryBatch.getNbEntries());
+ assertFalse(entryBatch.isEndOfFile());
+
+ log.rollWriter();
+ appendEntriesToLogAndSync(10);
+ entryBatch = reader.take();
+ assertEquals(walPath2, entryBatch.getLastWalPath());
+ assertEquals(0, entryBatch.getNbEntries());
+ assertTrue(entryBatch.isEndOfFile());
+
+ Path walPath3 = walQueue.peek();
+ entryBatch = reader.take();
+ assertEquals(walPath3, entryBatch.getLastWalPath());
+ assertEquals(10, entryBatch.getNbEntries());
+ assertFalse(entryBatch.isEndOfFile());
+ }
+
private String getRow(WAL.Entry entry) {
Cell cell = entry.getEdit().getCells().get(0);
return Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
@@ -380,22 +472,28 @@ public class TestWALEntryStream {
log.sync(txid);
}
- private void appendEntriesToLog(int count) throws IOException {
+ private void appendEntriesToLogAndSync(int count) throws IOException {
+ long txid = -1L;
for (int i = 0; i < count; i++) {
- appendToLog();
+ txid = appendToLog(1);
}
+ log.sync(txid);
}
- private void appendToLog() throws IOException {
- appendToLogPlus(1);
+ private void appendToLogAndSync() throws IOException {
+ appendToLogAndSync(1);
}
- private void appendToLogPlus(int count) throws IOException {
- final long txid = log.append(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
- System.currentTimeMillis(), mvcc, scopes), getWALEdits(count), true);
+ private void appendToLogAndSync(int count) throws IOException {
+ long txid = appendToLog(count);
log.sync(txid);
}
+ private long appendToLog(int count) throws IOException {
+ return log.append(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
+ System.currentTimeMillis(), mvcc, scopes), getWALEdits(count), true);
+ }
+
private WALEdit getWALEdits(int count) {
WALEdit edit = new WALEdit();
for (int i = 0; i < count; i++) {
@@ -439,7 +537,7 @@ public class TestWALEntryStream {
appendToLog("2");
long size = log.getLogFileSizeIfBeingWritten(walQueue.peek()).getAsLong();
AtomicLong fileLength = new AtomicLong(size - 1);
- try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, conf, 0,
+ try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, CONF, 0,
p -> OptionalLong.of(fileLength.get()), null, new MetricsSource("1"))) {
assertTrue(entryStream.hasNext());
assertNotNull(entryStream.next());