You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/05/15 00:40:49 UTC
[30/32] hbase git commit: HBASE-20434 Also remove remote wals when
peer is in DA state
HBASE-20434 Also remove remote wals when peer is in DA state
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c22cc8be
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c22cc8be
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c22cc8be
Branch: refs/heads/HBASE-19064
Commit: c22cc8be0fb15afb52b71019c4abc714c6276885
Parents: 71be2a8
Author: zhangduo <zh...@apache.org>
Authored: Wed Apr 25 17:12:23 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Tue May 15 08:39:04 2018 +0800
----------------------------------------------------------------------
.../hbase/replication/ReplicationUtils.java | 4 +
...ransitPeerSyncReplicationStateProcedure.java | 2 +-
.../regionserver/ReplicationSource.java | 7 +-
.../regionserver/ReplicationSourceManager.java | 86 ++++++++++------
.../hadoop/hbase/wal/AbstractFSWALProvider.java | 19 ++--
.../hbase/wal/SyncReplicationWALProvider.java | 30 +++++-
.../TestSyncReplicationRemoveRemoteWAL.java | 101 +++++++++++++++++++
.../TestReplicationSourceManager.java | 68 ++++++++-----
8 files changed, 251 insertions(+), 66 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/c22cc8be/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
index 66e9b01..069db7a 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
@@ -191,6 +191,10 @@ public final class ReplicationUtils {
return new Path(remoteWALDir, peerId);
}
+ public static Path getRemoteWALDirForPeer(Path remoteWALDir, String peerId) {
+ return new Path(remoteWALDir, peerId);
+ }
+
/**
* Do the sleeping logic
* @param msg Why we sleep
http://git-wip-us.apache.org/repos/asf/hbase/blob/c22cc8be/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
index 5da2b0c..99fd615 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
@@ -211,7 +211,7 @@ public class TransitPeerSyncReplicationStateProcedure
case CREATE_DIR_FOR_REMOTE_WAL:
MasterFileSystem mfs = env.getMasterFileSystem();
Path remoteWALDir = new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME);
- Path remoteWALDirForPeer = new Path(remoteWALDir, peerId);
+ Path remoteWALDirForPeer = ReplicationUtils.getRemoteWALDirForPeer(remoteWALDir, peerId);
FileSystem walFs = mfs.getWALFileSystem();
try {
if (walFs.exists(remoteWALDirForPeer)) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/c22cc8be/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 1a27fc1..7313f13 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -549,14 +549,17 @@ public class ReplicationSource implements ReplicationSourceInterface {
}
/**
+ * <p>
* Split a path to get the start time
+ * </p>
+ * <p>
* For example: 10.20.20.171%3A60020.1277499063250
+ * </p>
* @param p path to split
* @return start time
*/
private static long getTS(Path p) {
- int tsIndex = p.getName().lastIndexOf('.') + 1;
- return Long.parseLong(p.getName().substring(tsIndex));
+ return AbstractFSWALProvider.getWALStartTimeFromWALName(p.getName());
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c22cc8be/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 2d0d82b..5015129 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -61,6 +62,7 @@ import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -542,20 +544,40 @@ public class ReplicationSourceManager implements ReplicationListener {
if (source.isRecovered()) {
NavigableSet<String> wals = walsByIdRecoveredQueues.get(source.getQueueId()).get(logPrefix);
if (wals != null) {
- cleanOldLogs(wals, log, inclusive, source);
+ NavigableSet<String> walsToRemove = wals.headSet(log, inclusive);
+ if (walsToRemove.isEmpty()) {
+ return;
+ }
+ cleanOldLogs(walsToRemove, source);
+ walsToRemove.clear();
}
} else {
+ NavigableSet<String> wals;
+ NavigableSet<String> walsToRemove;
// synchronized on walsById to avoid race with preLogRoll
synchronized (this.walsById) {
- NavigableSet<String> wals = walsById.get(source.getQueueId()).get(logPrefix);
- if (wals != null) {
- cleanOldLogs(wals, log, inclusive, source);
+ wals = walsById.get(source.getQueueId()).get(logPrefix);
+ if (wals == null) {
+ return;
+ }
+ walsToRemove = wals.headSet(log, inclusive);
+ if (walsToRemove.isEmpty()) {
+ return;
}
+ walsToRemove = new TreeSet<>(walsToRemove);
+ }
+ // cleanOldLogs may spend some time, especially for sync replication where we may want to
+ // remove remote wals as the remote cluster may have already been down, so we do it outside
+ // the lock to avoid block preLogRoll
+ cleanOldLogs(walsToRemove, source);
+ // now let's remove the files in the set
+ synchronized (this.walsById) {
+ wals.removeAll(walsToRemove);
}
}
}
- private void removeRemoteWALs(String peerId, String remoteWALDir, Set<String> wals)
+ private void removeRemoteWALs(String peerId, String remoteWALDir, Collection<String> wals)
throws IOException {
Path remoteWALDirForPeer = ReplicationUtils.getRemoteWALDirForPeer(remoteWALDir, peerId);
FileSystem fs = ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir);
@@ -575,13 +597,8 @@ public class ReplicationSourceManager implements ReplicationListener {
}
}
- private void cleanOldLogs(NavigableSet<String> wals, String key, boolean inclusive,
- ReplicationSourceInterface source) {
- NavigableSet<String> walSet = wals.headSet(key, inclusive);
- if (walSet.isEmpty()) {
- return;
- }
- LOG.debug("Removing {} logs in the list: {}", walSet.size(), walSet);
+ private void cleanOldLogs(NavigableSet<String> wals, ReplicationSourceInterface source) {
+ LOG.debug("Removing {} logs in the list: {}", wals.size(), wals);
// The intention here is that, we want to delete the remote wal files ASAP as it may effect the
// failover time if you want to transit the remote cluster from S to A. And the infinite retry
// is not a problem, as if we can not contact with the remote HDFS cluster, then usually we can
@@ -589,31 +606,38 @@ public class ReplicationSourceManager implements ReplicationListener {
if (source.isSyncReplication()) {
String peerId = source.getPeerId();
String remoteWALDir = source.getPeer().getPeerConfig().getRemoteWALDir();
- LOG.debug("Removing {} logs from remote dir {} in the list: {}", walSet.size(), remoteWALDir,
- walSet);
- for (int sleepMultiplier = 0;;) {
- try {
- removeRemoteWALs(peerId, remoteWALDir, walSet);
- break;
- } catch (IOException e) {
- LOG.warn("Failed to delete remote wals from remote dir {} for peer {}", remoteWALDir,
- peerId);
- }
- if (!source.isSourceActive()) {
- // skip the following operations
- return;
- }
- if (ReplicationUtils.sleepForRetries("Failed to delete remote wals", sleepForRetries,
- sleepMultiplier, maxRetriesMultiplier)) {
- sleepMultiplier++;
+ // Filter out the wals need to be removed from the remote directory. Its name should be the
+ // special format, and also, the peer id in its name should match the peer id for the
+ // replication source.
+ List<String> remoteWals = wals.stream().filter(w -> SyncReplicationWALProvider
+ .getSyncReplicationPeerIdFromWALName(w).map(peerId::equals).orElse(false))
+ .collect(Collectors.toList());
+ LOG.debug("Removing {} logs from remote dir {} in the list: {}", remoteWals.size(),
+ remoteWALDir, remoteWals);
+ if (!remoteWals.isEmpty()) {
+ for (int sleepMultiplier = 0;;) {
+ try {
+ removeRemoteWALs(peerId, remoteWALDir, remoteWals);
+ break;
+ } catch (IOException e) {
+ LOG.warn("Failed to delete remote wals from remote dir {} for peer {}", remoteWALDir,
+ peerId);
+ }
+ if (!source.isSourceActive()) {
+ // skip the following operations
+ return;
+ }
+ if (ReplicationUtils.sleepForRetries("Failed to delete remote wals", sleepForRetries,
+ sleepMultiplier, maxRetriesMultiplier)) {
+ sleepMultiplier++;
+ }
}
}
}
String queueId = source.getQueueId();
- for (String wal : walSet) {
+ for (String wal : wals) {
abortWhenFail(() -> this.queueStorage.removeWAL(server.getServerName(), queueId, wal));
}
- walSet.clear();
}
// public because of we call it in TestReplicationEmptyWALRecovery
http://git-wip-us.apache.org/repos/asf/hbase/blob/c22cc8be/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
index e528624..ccdc95f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
@@ -517,6 +517,14 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
listeners.add(listener);
}
+ private static String getWALNameGroupFromWALName(String name, int group) {
+ Matcher matcher = WAL_FILE_NAME_PATTERN.matcher(name);
+ if (matcher.matches()) {
+ return matcher.group(group);
+ } else {
+ throw new IllegalArgumentException(name + " is not a valid wal file name");
+ }
+ }
/**
* Get prefix of the log from its name, assuming WAL name in format of
* log_prefix.filenumber.log_suffix
@@ -526,11 +534,10 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
* @see AbstractFSWAL#getCurrentFileName()
*/
public static String getWALPrefixFromWALName(String name) {
- Matcher matcher = WAL_FILE_NAME_PATTERN.matcher(name);
- if (matcher.matches()) {
- return matcher.group(1);
- } else {
- throw new IllegalArgumentException(name + " is not a valid wal file name");
- }
+ return getWALNameGroupFromWALName(name, 1);
+ }
+
+ public static long getWALStartTimeFromWALName(String name) {
+ return Long.parseLong(getWALNameGroupFromWALName(name, 2));
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c22cc8be/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
index 8faccd7..8e82d8b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
@@ -29,6 +29,8 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.function.BiPredicate;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
@@ -48,6 +50,7 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.collect.Streams;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
@@ -64,7 +67,8 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
private static final Logger LOG = LoggerFactory.getLogger(SyncReplicationWALProvider.class);
- private static final String LOG_SUFFIX = ".syncrep";
+ @VisibleForTesting
+ public static final String LOG_SUFFIX = ".syncrep";
private final WALProvider provider;
@@ -288,4 +292,28 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
return false;
}
}
+
+ private static final Pattern LOG_PREFIX_PATTERN = Pattern.compile(".*-\\d+-(.+)");
+
+ /**
+ * <p>
+ * Returns the peer id if the wal file name is in the special group for a sync replication peer.
+ * </p>
+ * <p>
+ * The prefix format is <factoryId>-<ts>-<peerId>.
+ * </p>
+ */
+ public static Optional<String> getSyncReplicationPeerIdFromWALName(String name) {
+ if (!name.endsWith(LOG_SUFFIX)) {
+ // fast path to return earlier if the name is not for a sync replication peer.
+ return Optional.empty();
+ }
+ String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(name);
+ Matcher matcher = LOG_PREFIX_PATTERN.matcher(logPrefix);
+ if (matcher.matches()) {
+ return Optional.of(matcher.group(1));
+ } else {
+ return Optional.empty();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c22cc8be/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationRemoveRemoteWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationRemoveRemoteWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationRemoveRemoteWAL.java
new file mode 100644
index 0000000..7d380c1
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationRemoveRemoteWAL.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.hamcrest.CoreMatchers.endsWith;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
+import org.apache.hadoop.hbase.master.MasterFileSystem;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ ReplicationTests.class, LargeTests.class })
+public class TestSyncReplicationRemoveRemoteWAL extends SyncReplicationTestBase {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestSyncReplicationRemoveRemoteWAL.class);
+
+ private void waitUntilDeleted(Path remoteWAL) throws Exception {
+ MasterFileSystem mfs = UTIL2.getMiniHBaseCluster().getMaster().getMasterFileSystem();
+ UTIL1.waitFor(30000, new ExplainingPredicate<Exception>() {
+
+ @Override
+ public boolean evaluate() throws Exception {
+ return !mfs.getWALFileSystem().exists(remoteWAL);
+ }
+
+ @Override
+ public String explainFailure() throws Exception {
+ return remoteWAL + " has not been deleted yet";
+ }
+ });
+ }
+
+ @Test
+ public void testRemoveRemoteWAL() throws Exception {
+ UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+ SyncReplicationState.STANDBY);
+ UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+ SyncReplicationState.ACTIVE);
+
+ MasterFileSystem mfs = UTIL2.getMiniHBaseCluster().getMaster().getMasterFileSystem();
+ Path remoteWALDir = ReplicationUtils.getRemoteWALDirForPeer(
+ new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME), PEER_ID);
+ FileStatus[] remoteWALStatus = mfs.getWALFileSystem().listStatus(remoteWALDir);
+ assertEquals(1, remoteWALStatus.length);
+ Path remoteWAL = remoteWALStatus[0].getPath();
+ assertThat(remoteWAL.getName(), endsWith(SyncReplicationWALProvider.LOG_SUFFIX));
+ writeAndVerifyReplication(UTIL1, UTIL2, 0, 100);
+
+ HRegionServer rs = UTIL1.getRSForFirstRegionInTable(TABLE_NAME);
+ rs.getWalRoller().requestRollAll();
+ // The replicated wal file should be deleted finally
+ waitUntilDeleted(remoteWAL);
+ remoteWALStatus = mfs.getWALFileSystem().listStatus(remoteWALDir);
+ assertEquals(1, remoteWALStatus.length);
+ remoteWAL = remoteWALStatus[0].getPath();
+ assertThat(remoteWAL.getName(), endsWith(SyncReplicationWALProvider.LOG_SUFFIX));
+
+ UTIL1.getAdmin().disableReplicationPeer(PEER_ID);
+ write(UTIL1, 100, 200);
+ UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+ SyncReplicationState.DOWNGRADE_ACTIVE);
+
+ // should still be there since the peer is disabled and we haven't replicated the data yet
+ assertTrue(mfs.getWALFileSystem().exists(remoteWAL));
+
+ UTIL1.getAdmin().enableReplicationPeer(PEER_ID);
+ waitUntilReplicationDone(UTIL2, 200);
+ verifyThroughRegion(UTIL2, 100, 200);
+
+ // Confirm that we will also remove the remote wal files in DA state
+ waitUntilDeleted(remoteWAL);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c22cc8be/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index cff8ceb..d98b7f85 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -84,6 +84,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
@@ -592,27 +593,10 @@ public abstract class TestReplicationSourceManager {
}
}
- @Test
- public void testRemoveRemoteWALs() throws IOException {
- // make sure that we can deal with files which does not exist
- String walNameNotExists = "remoteWAL.0";
- Path wal = new Path(logDir, walNameNotExists);
- manager.preLogRoll(wal);
- manager.postLogRoll(wal);
-
- Path remoteLogDirForPeer = new Path(remoteLogDir, slaveId);
- fs.mkdirs(remoteLogDirForPeer);
- String walName = "remoteWAL.1";
- Path remoteWAL =
- new Path(remoteLogDirForPeer, walName).makeQualified(fs.getUri(), fs.getWorkingDirectory());
- fs.create(remoteWAL).close();
- wal = new Path(logDir, walName);
- manager.preLogRoll(wal);
- manager.postLogRoll(wal);
-
+ private ReplicationSourceInterface mockReplicationSource(String peerId) {
ReplicationSourceInterface source = mock(ReplicationSourceInterface.class);
- when(source.getPeerId()).thenReturn(slaveId);
- when(source.getQueueId()).thenReturn(slaveId);
+ when(source.getPeerId()).thenReturn(peerId);
+ when(source.getQueueId()).thenReturn(peerId);
when(source.isRecovered()).thenReturn(false);
when(source.isSyncReplication()).thenReturn(true);
ReplicationPeerConfig config = mock(ReplicationPeerConfig.class);
@@ -621,17 +605,51 @@ public abstract class TestReplicationSourceManager {
ReplicationPeer peer = mock(ReplicationPeer.class);
when(peer.getPeerConfig()).thenReturn(config);
when(source.getPeer()).thenReturn(peer);
- manager.cleanOldLogs(walName, true, source);
+ return source;
+ }
- assertFalse(fs.exists(remoteWAL));
+ @Test
+ public void testRemoveRemoteWALs() throws Exception {
+ String peerId2 = slaveId + "_2";
+ addPeerAndWait(peerId2,
+ ReplicationPeerConfig.newBuilder()
+ .setClusterKey("localhost:" + utility.getZkCluster().getClientPort() + ":/hbase").build(),
+ true);
+ try {
+ // make sure that we can deal with files which does not exist
+ String walNameNotExists =
+ "remoteWAL-12345-" + slaveId + ".12345" + SyncReplicationWALProvider.LOG_SUFFIX;
+ Path wal = new Path(logDir, walNameNotExists);
+ manager.preLogRoll(wal);
+ manager.postLogRoll(wal);
+
+ Path remoteLogDirForPeer = new Path(remoteLogDir, slaveId);
+ fs.mkdirs(remoteLogDirForPeer);
+ String walName =
+ "remoteWAL-12345-" + slaveId + ".23456" + SyncReplicationWALProvider.LOG_SUFFIX;
+ Path remoteWAL =
+ new Path(remoteLogDirForPeer, walName).makeQualified(fs.getUri(), fs.getWorkingDirectory());
+ fs.create(remoteWAL).close();
+ wal = new Path(logDir, walName);
+ manager.preLogRoll(wal);
+ manager.postLogRoll(wal);
+
+ ReplicationSourceInterface source = mockReplicationSource(peerId2);
+ manager.cleanOldLogs(walName, true, source);
+ // still there if peer id does not match
+ assertTrue(fs.exists(remoteWAL));
+
+ source = mockReplicationSource(slaveId);
+ manager.cleanOldLogs(walName, true, source);
+ assertFalse(fs.exists(remoteWAL));
+ } finally {
+ removePeerAndWait(peerId2);
+ }
}
/**
* Add a peer and wait for it to initialize
- * @param peerId
- * @param peerConfig
* @param waitForSource Whether to wait for replication source to initialize
- * @throws Exception
*/
private void addPeerAndWait(final String peerId, final ReplicationPeerConfig peerConfig,
final boolean waitForSource) throws Exception {