You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/05/04 09:59:56 UTC
[29/29] hbase git commit: HBASE-20456 Support removing a
ReplicationSourceShipper for a special wal group
HBASE-20456 Support removing a ReplicationSourceShipper for a special wal group
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/75046eeb
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/75046eeb
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/75046eeb
Branch: refs/heads/HBASE-19064
Commit: 75046eebf6d8bfaabdbdb8c2f16f8930ce892441
Parents: b564486
Author: zhangduo <zh...@apache.org>
Authored: Tue Apr 24 22:01:21 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Fri May 4 17:54:52 2018 +0800
----------------------------------------------------------------------
.../hbase/regionserver/wal/AsyncFSWAL.java | 1 +
.../RecoveredReplicationSource.java | 13 +---
.../RecoveredReplicationSourceShipper.java | 7 --
.../regionserver/ReplicationSource.java | 13 +++-
.../regionserver/ReplicationSourceManager.java | 19 ++++-
.../regionserver/ReplicationSourceShipper.java | 20 +++--
.../ReplicationSourceWALReader.java | 9 ++-
.../regionserver/WALEntryStream.java | 3 +-
.../hadoop/hbase/wal/AbstractFSWALProvider.java | 28 ++++---
.../hbase/wal/SyncReplicationWALProvider.java | 10 ++-
.../TestReplicationSourceManager.java | 5 +-
.../TestSyncReplicationShipperQuit.java | 81 ++++++++++++++++++++
.../regionserver/TestWALEntryStream.java | 4 +-
13 files changed, 163 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/75046eeb/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
index 17133ed..f630e63 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
@@ -682,6 +682,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
protected void doShutdown() throws IOException {
waitForSafePoint();
closeWriter(this.writer);
+ this.writer = null;
closeExecutor.shutdown();
try {
if (!closeExecutor.awaitTermination(waitOnShutdownInSeconds, TimeUnit.SECONDS)) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/75046eeb/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
index a21ca44..f1bb538 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@@ -144,15 +143,9 @@ public class RecoveredReplicationSource extends ReplicationSource {
}
void tryFinish() {
- // use synchronize to make sure one last thread will clean the queue
- synchronized (workerThreads) {
- Threads.sleep(100);// wait a short while for other worker thread to fully exit
- boolean allTasksDone = workerThreads.values().stream().allMatch(w -> w.isFinished());
- if (allTasksDone) {
- this.getSourceMetrics().clear();
- manager.removeRecoveredSource(this);
- LOG.info("Finished recovering queue {} with the following stats: {}", queueId, getStats());
- }
+ if (workerThreads.isEmpty()) {
+ this.getSourceMetrics().clear();
+ manager.finishRecoveredSource(this);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/75046eeb/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
index 91109cf..b0d4db0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
@@ -48,13 +48,6 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper
}
@Override
- protected void noMoreData() {
- LOG.debug("Finished recovering queue for group {} of peer {}", walGroupId, source.getQueueId());
- source.getSourceMetrics().incrCompletedRecoveryQueue();
- setWorkerState(WorkerState.FINISHED);
- }
-
- @Override
protected void postFinish() {
source.tryFinish();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/75046eeb/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 01ccb11..1a27fc1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -62,6 +62,7 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
/**
@@ -122,6 +123,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
private long defaultBandwidth;
private long currentBandwidth;
private WALFileLengthProvider walFileLengthProvider;
+ @VisibleForTesting
protected final ConcurrentHashMap<String, ReplicationSourceShipper> workerThreads =
new ConcurrentHashMap<>();
@@ -192,6 +194,9 @@ public class ReplicationSource implements ReplicationSourceInterface {
PriorityBlockingQueue<Path> queue = queues.get(logPrefix);
if (queue == null) {
queue = new PriorityBlockingQueue<>(queueSizePerGroup, new LogsComparator());
+ // make sure that we do not use an empty queue when setting up a ReplicationSource, otherwise
+ // the shipper may quit immediately
+ queue.put(log);
queues.put(logPrefix, queue);
if (this.isSourceActive() && this.replicationEndpoint != null) {
// new wal group observed after source startup, start a new worker thread to track it
@@ -199,8 +204,10 @@ public class ReplicationSource implements ReplicationSourceInterface {
// still not launched, so it's necessary to check workerThreads before start the worker
tryStartNewShipper(logPrefix, queue);
}
+ } else {
+ queue.put(log);
}
- queue.put(log);
+
this.metrics.incrSizeOfLogQueue();
// This will log a warning for each new log that gets created above the warn threshold
int queueSize = queue.size();
@@ -611,5 +618,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
return queueStorage;
}
-
+ void removeWorker(ReplicationSourceShipper worker) {
+ workerThreads.remove(worker.walGroupId, worker);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/75046eeb/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index cbeba23..2d0d82b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -443,12 +443,25 @@ public class ReplicationSourceManager implements ReplicationListener {
* Clear the metrics and related replication queue of the specified old source
* @param src source to clear
*/
- void removeRecoveredSource(ReplicationSourceInterface src) {
- LOG.info("Done with the recovered queue " + src.getQueueId());
- this.oldsources.remove(src);
+ private boolean removeRecoveredSource(ReplicationSourceInterface src) {
+ if (!this.oldsources.remove(src)) {
+ return false;
+ }
+ LOG.info("Done with the recovered queue {}", src.getQueueId());
// Delete queue from storage and memory
deleteQueue(src.getQueueId());
this.walsByIdRecoveredQueues.remove(src.getQueueId());
+ return true;
+ }
+
+ void finishRecoveredSource(ReplicationSourceInterface src) {
+ synchronized (oldsources) {
+ if (!removeRecoveredSource(src)) {
+ return;
+ }
+ }
+ LOG.info("Finished recovering queue {} with the following stats: {}", src.getQueueId(),
+ src.getStats());
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/75046eeb/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
index 3f97b5e..b1361fd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -50,13 +50,13 @@ public class ReplicationSourceShipper extends Thread {
public enum WorkerState {
RUNNING,
STOPPED,
- FINISHED, // The worker is done processing a recovered queue
+ FINISHED, // The worker is done processing a queue
}
private final Configuration conf;
protected final String walGroupId;
protected final PriorityBlockingQueue<Path> queue;
- private final ReplicationSourceInterface source;
+ private final ReplicationSource source;
// Last position in the log that we sent to ZooKeeper
// It will be accessed by the stats thread so make it volatile
@@ -73,7 +73,7 @@ public class ReplicationSourceShipper extends Thread {
protected final int maxRetriesMultiplier;
public ReplicationSourceShipper(Configuration conf, String walGroupId,
- PriorityBlockingQueue<Path> queue, ReplicationSourceInterface source) {
+ PriorityBlockingQueue<Path> queue, ReplicationSource source) {
this.conf = conf;
this.walGroupId = walGroupId;
this.queue = queue;
@@ -98,7 +98,7 @@ public class ReplicationSourceShipper extends Thread {
}
try {
WALEntryBatch entryBatch = entryReader.take();
- // the NO_MORE_DATA instance has no path so do not all shipEdits
+ // the NO_MORE_DATA instance has no path so do not call shipEdits
if (entryBatch == WALEntryBatch.NO_MORE_DATA) {
noMoreData();
} else {
@@ -113,12 +113,20 @@ public class ReplicationSourceShipper extends Thread {
if (!isFinished()) {
setWorkerState(WorkerState.STOPPED);
} else {
+ source.removeWorker(this);
postFinish();
}
}
- // To be implemented by recovered shipper
- protected void noMoreData() {
+ private void noMoreData() {
+ if (source.isRecovered()) {
+ LOG.debug("Finished recovering queue for group {} of peer {}", walGroupId,
+ source.getQueueId());
+ source.getSourceMetrics().incrCompletedRecoveryQueue();
+ } else {
+ LOG.debug("Finished queue for group {} of peer {}", walGroupId, source.getQueueId());
+ }
+ setWorkerState(WorkerState.FINISHED);
}
// To be implemented by recovered shipper
http://git-wip-us.apache.org/repos/asf/hbase/blob/75046eeb/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index 64fd48d..61ab7c2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -142,7 +142,7 @@ class ReplicationSourceWALReader extends Thread {
entryBatchQueue.put(batch);
sleepMultiplier = 1;
} else { // got no entries and didn't advance position in WAL
- handleEmptyWALEntryBatch(entryStream.getCurrentPath());
+ handleEmptyWALEntryBatch();
entryStream.reset(); // reuse stream
}
}
@@ -224,10 +224,11 @@ class ReplicationSourceWALReader extends Thread {
return batch;
}
- private void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException {
+ private void handleEmptyWALEntryBatch() throws InterruptedException {
LOG.trace("Didn't read any new entries from WAL");
- if (source.isRecovered()) {
- // we're done with queue recovery, shut ourself down
+ if (logQueue.isEmpty()) {
+ // we're done with current queue, either this is a recovered queue, or it is the special group
+ // for a sync replication peer and the peer has been transited to DA or S state.
setReaderRunning(false);
// shuts down shipper thread immediately
entryBatchQueue.put(WALEntryBatch.NO_MORE_DATA);
http://git-wip-us.apache.org/repos/asf/hbase/blob/75046eeb/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
index b2c199e..0393af4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
@@ -304,7 +304,8 @@ class WALEntryStream implements Closeable {
return true;
}
} else {
- // no more files in queue, this could only happen for recovered queue.
+ // no more files in queue, this could happen for recovered queue, or for a wal group of a sync
+ // replication peer which has already been transited to DA or S.
setCurrentPath(null);
}
return false;
http://git-wip-us.apache.org/repos/asf/hbase/blob/75046eeb/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
index 5a3fba3..e528624 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
@@ -24,6 +24,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
@@ -247,26 +248,30 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
if (walName == null) {
throw new IllegalArgumentException("The WAL path couldn't be null");
}
- final String[] walPathStrs = walName.toString().split("\\" + WAL_FILE_NAME_DELIMITER);
- return Long.parseLong(walPathStrs[walPathStrs.length - (isMetaFile(walName) ? 2 : 1)]);
+ Matcher matcher = WAL_FILE_NAME_PATTERN.matcher(walName.getName());
+ if (matcher.matches()) {
+ return Long.parseLong(matcher.group(2));
+ } else {
+ throw new IllegalArgumentException(walName.getName() + " is not a valid wal file name");
+ }
}
/**
* Pattern used to validate a WAL file name see {@link #validateWALFilename(String)} for
* description.
*/
- private static final Pattern pattern =
- Pattern.compile(".*\\.\\d*(" + META_WAL_PROVIDER_ID + ")*");
+ private static final Pattern WAL_FILE_NAME_PATTERN =
+ Pattern.compile("(.+)\\.(\\d+)(\\.[0-9A-Za-z]+)?");
/**
* A WAL file name is of the format: <wal-name>{@link #WAL_FILE_NAME_DELIMITER}
- * <file-creation-timestamp>[.meta]. provider-name is usually made up of a server-name and a
- * provider-id
+ * <file-creation-timestamp>[.<suffix>]. provider-name is usually made up of a
+ * server-name and a provider-id
* @param filename name of the file to validate
* @return <tt>true</tt> if the filename matches an WAL, <tt>false</tt> otherwise
*/
public static boolean validateWALFilename(String filename) {
- return pattern.matcher(filename).matches();
+ return WAL_FILE_NAME_PATTERN.matcher(filename).matches();
}
/**
@@ -517,10 +522,15 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
* log_prefix.filenumber.log_suffix
* @param name Name of the WAL to parse
* @return prefix of the log
+ * @throws IllegalArgumentException if the name passed in is not a valid wal file name
* @see AbstractFSWAL#getCurrentFileName()
*/
public static String getWALPrefixFromWALName(String name) {
- int endIndex = name.replaceAll(META_WAL_PROVIDER_ID, "").lastIndexOf(".");
- return name.substring(0, endIndex);
+ Matcher matcher = WAL_FILE_NAME_PATTERN.matcher(name);
+ if (matcher.matches()) {
+ return matcher.group(1);
+ } else {
+ throw new IllegalArgumentException(name + " is not a valid wal file name");
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/75046eeb/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
index 3b56aa2..8faccd7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hbase.wal;
-import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER;
import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getWALArchiveDirectoryName;
import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getWALDirectoryName;
@@ -42,6 +41,7 @@ import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.replication.regionserver.PeerActionListener;
import org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerInfoProvider;
import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.KeyLocker;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
@@ -113,8 +113,12 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
channelClass = eventLoopGroupAndChannelClass.getSecond();
}
+ // Use a timestamp to make it identical. That means, after we transit the peer to DA/S and then
+ // back to A, the log prefix will be changed. This is used to simplify the implementation for
+ // replication source, where we do not need to consider that a terminated shipper could be added
+ // back.
private String getLogPrefix(String peerId) {
- return factory.factoryId + WAL_FILE_NAME_DELIMITER + peerId;
+ return factory.factoryId + "-" + EnvironmentEdgeManager.currentTime() + "-" + peerId;
}
private DualAsyncFSWAL createWAL(String peerId, String remoteWALDir) throws IOException {
@@ -250,7 +254,7 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
@Override
public void peerSyncReplicationStateChange(String peerId, SyncReplicationState from,
SyncReplicationState to, int stage) {
- if (from == SyncReplicationState.ACTIVE && to == SyncReplicationState.DOWNGRADE_ACTIVE) {
+ if (from == SyncReplicationState.ACTIVE) {
if (stage == 0) {
Lock lock = createLock.acquireLock(peerId);
try {
http://git-wip-us.apache.org/repos/asf/hbase/blob/75046eeb/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 5ea3173..cff8ceb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -80,6 +80,7 @@ import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.Pair;
@@ -393,8 +394,8 @@ public abstract class TestReplicationSourceManager {
// populate some znodes in the peer znode
SortedSet<String> files = new TreeSet<>();
String group = "testgroup";
- String file1 = group + ".log1";
- String file2 = group + ".log2";
+ String file1 = group + "." + EnvironmentEdgeManager.currentTime() + ".log1";
+ String file2 = group + "." + EnvironmentEdgeManager.currentTime() + ".log2";
files.add(file1);
files.add(file2);
for (String file : files) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/75046eeb/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSyncReplicationShipperQuit.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSyncReplicationShipperQuit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSyncReplicationShipperQuit.java
new file mode 100644
index 0000000..f6dc3d7
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSyncReplicationShipperQuit.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.SyncReplicationState;
+import org.apache.hadoop.hbase.replication.SyncReplicationTestBase;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Testcase for HBASE-20456.
+ */
+@Category({ ReplicationTests.class, LargeTests.class })
+public class TestSyncReplicationShipperQuit extends SyncReplicationTestBase {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestSyncReplicationShipperQuit.class);
+
+ @Test
+ public void testShipperQuitWhenDA() throws Exception {
+ // set to serial replication
+ UTIL1.getAdmin().updateReplicationPeerConfig(PEER_ID, ReplicationPeerConfig
+ .newBuilder(UTIL1.getAdmin().getReplicationPeerConfig(PEER_ID)).setSerial(true).build());
+ UTIL2.getAdmin().updateReplicationPeerConfig(PEER_ID, ReplicationPeerConfig
+ .newBuilder(UTIL2.getAdmin().getReplicationPeerConfig(PEER_ID)).setSerial(true).build());
+ UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+ SyncReplicationState.STANDBY);
+ UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+ SyncReplicationState.ACTIVE);
+
+ writeAndVerifyReplication(UTIL1, UTIL2, 0, 100);
+ HRegionServer rs = UTIL1.getRSForFirstRegionInTable(TABLE_NAME);
+ DualAsyncFSWAL wal =
+ (DualAsyncFSWAL) rs.getWAL(RegionInfoBuilder.newBuilder(TABLE_NAME).build());
+ String walGroupId =
+ AbstractFSWALProvider.getWALPrefixFromWALName(wal.getCurrentFileName().getName());
+ ReplicationSourceShipper shipper =
+ ((ReplicationSource) ((Replication) rs.getReplicationSourceService()).getReplicationManager()
+ .getSource(PEER_ID)).workerThreads.get(walGroupId);
+ assertFalse(shipper.isFinished());
+
+ UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+ SyncReplicationState.DOWNGRADE_ACTIVE);
+ writeAndVerifyReplication(UTIL1, UTIL2, 100, 200);
+
+ ReplicationSource source = (ReplicationSource) ((Replication) rs.getReplicationSourceService())
+ .getReplicationManager().getSource(PEER_ID);
+ // the peer is serial so here we can make sure that the previous wals have already been
+ // replicated, and finally the shipper should be removed from the worker pool
+ UTIL1.waitFor(10000, () -> !source.workerThreads.containsKey(walGroupId));
+ assertTrue(shipper.isFinished());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/75046eeb/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
index 35e4f82..fac6f74 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
@@ -413,9 +413,7 @@ public class TestWALEntryStream {
batch = reader.take();
assertEquals(walPath, batch.getLastWalPath());
assertEquals(5, batch.getNbEntries());
- // Actually this should be true but we haven't handled this yet since for a normal queue the
- // last one is always open... Not a big deal for now.
- assertFalse(batch.isEndOfFile());
+ assertTrue(batch.isEndOfFile());
assertSame(WALEntryBatch.NO_MORE_DATA, reader.take());
}