You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2022/11/01 04:29:55 UTC
[hbase] 03/06: HBASE-27214 Implement the new replication hfile/log cleaner (#4722)
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch HBASE-27109/table_based_rqs
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit ba692838438a3dde4f4444da68358e2ee13c9010
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Wed Aug 31 21:24:09 2022 +0800
HBASE-27214 Implement the new replication hfile/log cleaner (#4722)
Signed-off-by: Xin Sun <dd...@gmail.com>
---
.../org/apache/hadoop/hbase/master/HMaster.java | 1 -
.../hbase/master/cleaner/FileCleanerDelegate.java | 2 +-
.../hadoop/hbase/master/region/MasterRegion.java | 2 +-
.../hbase/master/replication/AddPeerProcedure.java | 15 +-
.../master/replication/ReplicationPeerManager.java | 8 +
.../hadoop/hbase/regionserver/HRegionServer.java | 2 +-
.../hbase/replication/ReplicationOffsetUtil.java | 47 +++
.../replication/master/ReplicationLogCleaner.java | 234 +++++++++----
.../master/ReplicationLogCleanerBarrier.java | 85 +++++
.../regionserver/ReplicationSourceManager.java | 18 +-
.../regionserver/ReplicationSyncUp.java | 5 +-
.../hadoop/hbase/wal/AbstractFSWALProvider.java | 29 ++
.../org/apache/hadoop/hbase/wal/WALFactory.java | 29 +-
.../hbase/master/cleaner/TestLogsCleaner.java | 227 +++++-------
.../cleaner/TestReplicationHFileCleaner.java | 43 ++-
.../replication/TestReplicationOffsetUtil.java | 52 +++
.../replication/master/TestLogCleanerBarrier.java | 60 ++++
.../master/TestReplicationLogCleaner.java | 385 +++++++++++++++++++++
.../regionserver/TestReplicationSourceManager.java | 2 +-
.../apache/hadoop/hbase/wal/TestWALFactory.java | 2 +-
.../apache/hadoop/hbase/wal/TestWALMethods.java | 14 +
21 files changed, 1008 insertions(+), 254 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 3e14823dc20..1b71565dd0f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -4280,5 +4280,4 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste
// initialize master side coprocessors before we start handling requests
this.cpHost = new MasterCoprocessorHost(this, conf);
}
-
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/FileCleanerDelegate.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/FileCleanerDelegate.java
index d37bb620273..e08f5329433 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/FileCleanerDelegate.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/FileCleanerDelegate.java
@@ -50,7 +50,7 @@ public interface FileCleanerDelegate extends Configurable, Stoppable {
}
/**
- * Used to do some cleanup work
+ * Will be called after cleaner run.
*/
default void postClean() {
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/region/MasterRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/region/MasterRegion.java
index 5ed0df0aa58..338abf38c6b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/region/MasterRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/region/MasterRegion.java
@@ -383,7 +383,7 @@ public final class MasterRegion {
params.archivedWalSuffix(), params.rollPeriodMs(), params.flushSize());
walRoller.start();
- WALFactory walFactory = new WALFactory(conf, server.getServerName().toString(), server, false);
+ WALFactory walFactory = new WALFactory(conf, server.getServerName(), server, false);
Path tableDir = CommonFSUtils.getTableDir(rootDir, td.getTableName());
Path initializingFlag = new Path(tableDir, INITIALIZING_FLAG);
Path initializedFlag = new Path(tableDir, INITIALIZED_FLAG);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java
index 6d0acee76ca..25a4cd4b08e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java
@@ -21,7 +21,6 @@ import java.io.IOException;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
-import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.replication.ReplicationException;
@@ -84,15 +83,21 @@ public class AddPeerProcedure extends ModifyPeerProcedure {
@Override
protected void releaseLatch(MasterProcedureEnv env) {
+ env.getReplicationPeerManager().getReplicationLogCleanerBarrier().enable();
if (peerConfig.isSyncReplication()) {
env.getReplicationPeerManager().releaseSyncReplicationPeerLock();
}
- ProcedurePrepareLatch.releaseLatch(latch, this);
+ super.releaseLatch(env);
}
@Override
protected void prePeerModification(MasterProcedureEnv env)
throws IOException, ReplicationException, ProcedureSuspendedException {
+ if (!env.getReplicationPeerManager().getReplicationLogCleanerBarrier().disable()) {
+ throw suspend(env.getMasterConfiguration(),
+ backoff -> LOG.warn("LogCleaner is run at the same time when adding peer {}, sleep {} secs",
+ peerId, backoff / 1000));
+ }
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
if (cpHost != null) {
cpHost.preAddReplicationPeer(peerId, peerConfig);
@@ -128,9 +133,13 @@ public class AddPeerProcedure extends ModifyPeerProcedure {
@Override
protected void afterReplay(MasterProcedureEnv env) {
if (getCurrentState() == getInitialState()) {
- // will try to acquire the lock when executing the procedure, no need to acquire it here
+ // do not need to disable log cleaner or acquire lock if we are in the initial state, later
+ // when executing the procedure we will try to disable and acquire.
return;
}
+ if (!env.getReplicationPeerManager().getReplicationLogCleanerBarrier().disable()) {
+ throw new IllegalStateException("can not disable log cleaner, this should not happen");
+ }
if (peerConfig.isSyncReplication()) {
if (!env.getReplicationPeerManager().tryAcquireSyncReplicationPeerLock()) {
throw new IllegalStateException(
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
index 59fd5414236..0a1dbf848bd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
+import org.apache.hadoop.hbase.replication.master.ReplicationLogCleanerBarrier;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
@@ -97,6 +98,9 @@ public class ReplicationPeerManager {
// Only allow to add one sync replication peer concurrently
private final Semaphore syncReplicationPeerLock = new Semaphore(1);
+ private final ReplicationLogCleanerBarrier replicationLogCleanerBarrier =
+ new ReplicationLogCleanerBarrier();
+
private final String clusterId;
private final Configuration conf;
@@ -691,4 +695,8 @@ public class ReplicationPeerManager {
public void releaseSyncReplicationPeerLock() {
syncReplicationPeerLock.release();
}
+
+ public ReplicationLogCleanerBarrier getReplicationLogCleanerBarrier() {
+ return replicationLogCleanerBarrier;
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 7c166df74af..99d7e327d1d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -1700,7 +1700,7 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
* be hooked up to WAL.
*/
private void setupWALAndReplication() throws IOException {
- WALFactory factory = new WALFactory(conf, serverName.toString(), this, true);
+ WALFactory factory = new WALFactory(conf, serverName, this, true);
// TODO Replication make assumptions here based on the default filesystem impl
Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString());
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationOffsetUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationOffsetUtil.java
new file mode 100644
index 00000000000..052c5542d47
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationOffsetUtil.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+public final class ReplicationOffsetUtil {
+
+ private ReplicationOffsetUtil() {
+ }
+
+ public static boolean shouldReplicate(ReplicationGroupOffset offset, String wal) {
+ // if no offset or the offset is just a place marker, replicate
+ if (offset == null || offset == ReplicationGroupOffset.BEGIN) {
+ return true;
+ }
+ // otherwise, compare the timestamp
+ long walTs = AbstractFSWALProvider.getTimestamp(wal);
+ long startWalTs = AbstractFSWALProvider.getTimestamp(offset.getWal());
+ if (walTs < startWalTs) {
+ return false;
+ } else if (walTs > startWalTs) {
+ return true;
+ }
+ // if the timestamp equals, usually it means we should include this wal but there is a special
+ // case, a negative offset means the wal has already been fully replicated, so here we should
+ // check the offset.
+ return offset.getOffset() >= 0;
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
index 7135ca9a9b2..f1fd8f8d6b3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
@@ -17,18 +17,29 @@
*/
package org.apache.hadoop.hbase.replication.master;
-import java.io.IOException;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.apache.hadoop.conf.Configuration;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
-import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
+import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
+import org.apache.hadoop.hbase.replication.ReplicationOffsetUtil;
+import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
+import org.apache.hadoop.hbase.replication.ReplicationQueueData;
+import org.apache.hadoop.hbase.replication.ReplicationQueueId;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,35 +51,129 @@ import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils;
/**
* Implementation of a log cleaner that checks if a log is still scheduled for replication before
* deleting it when its TTL is over.
+ * <p/>
+ * The logic is a bit complicated after we switch to use table based replication queue storage, see
+ * the design doc in HBASE-27109 and the comments in HBASE-27214 for more details.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public class ReplicationLogCleaner extends BaseLogCleanerDelegate {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationLogCleaner.class);
- private ZKWatcher zkw = null;
- private boolean shareZK = false;
- private ReplicationQueueStorage queueStorage;
+ private Set<ServerName> notFullyDeadServers;
+ private Set<String> peerIds;
+ // ServerName -> PeerId -> WalGroup -> Offset
+ // Here the server name is the source server name, so we can make sure that there is only one
+ // queue for a given peer, that why we can use a String peerId as key instead of
+ // ReplicationQueueId.
+ private Map<ServerName, Map<String, Map<String, ReplicationGroupOffset>>> replicationOffsets;
+ private ReplicationPeerManager rpm;
+ private Supplier<Set<ServerName>> getNotFullyDeadServers;
+
+ private boolean canFilter;
private boolean stopped = false;
- private Set<String> wals;
- private long readZKTimestamp = 0;
@Override
public void preClean() {
- readZKTimestamp = EnvironmentEdgeManager.currentTime();
- // TODO: revisit the implementation
- // try {
- // // The concurrently created new WALs may not be included in the return list,
- // // but they won't be deleted because they're not in the checking set.
- // wals = queueStorage.getAllWALs();
- // } catch (ReplicationException e) {
- // LOG.warn("Failed to read zookeeper, skipping checking deletable files");
- // wals = null;
- // }
+ if (this.getConf() == null) {
+ return;
+ }
+ canFilter = rpm.getReplicationLogCleanerBarrier().start();
+ if (canFilter) {
+ notFullyDeadServers = getNotFullyDeadServers.get();
+ peerIds = rpm.listPeers(null).stream().map(ReplicationPeerDescription::getPeerId)
+ .collect(Collectors.toSet());
+ // must get the not fully dead servers first and then get the replication queue data, in this
+ // way we can make sure that, we should have added the missing replication queues for the dead
+ // region servers recorded in the above set, otherwise the logic in the
+ // filterForDeadRegionServer method may lead us delete wal still in use.
+ List<ReplicationQueueData> allQueueData;
+ try {
+ allQueueData = rpm.getQueueStorage().listAllQueues();
+ } catch (ReplicationException e) {
+ LOG.error("Can not list all replication queues, give up cleaning", e);
+ rpm.getReplicationLogCleanerBarrier().stop();
+ canFilter = false;
+ notFullyDeadServers = null;
+ peerIds = null;
+ return;
+ }
+ replicationOffsets = new HashMap<>();
+ for (ReplicationQueueData queueData : allQueueData) {
+ ReplicationQueueId queueId = queueData.getId();
+ ServerName serverName = queueId.getServerWALsBelongTo();
+ Map<String, Map<String, ReplicationGroupOffset>> peerId2Offsets =
+ replicationOffsets.computeIfAbsent(serverName, k -> new HashMap<>());
+ Map<String, ReplicationGroupOffset> offsets =
+ peerId2Offsets.computeIfAbsent(queueId.getPeerId(), k -> new HashMap<>());
+ offsets.putAll(queueData.getOffsets());
+ }
+ } else {
+ LOG.info("Skip replication log cleaner because an AddPeerProcedure is running");
+ }
}
@Override
public void postClean() {
- // release memory
- wals = null;
+ if (canFilter) {
+ rpm.getReplicationLogCleanerBarrier().stop();
+ canFilter = false;
+ // release memory
+ notFullyDeadServers = null;
+ peerIds = null;
+ replicationOffsets = null;
+ }
+ }
+
+ private boolean shouldDelete(ReplicationGroupOffset offset, FileStatus file) {
+ return !ReplicationOffsetUtil.shouldReplicate(offset, file.getPath().getName());
+ }
+
+ private boolean filterForLiveRegionServer(ServerName serverName, FileStatus file) {
+ Map<String, Map<String, ReplicationGroupOffset>> peerId2Offsets =
+ replicationOffsets.get(serverName);
+ if (peerId2Offsets == null) {
+ // if there are replication queues missing, we can not delete the wal
+ return false;
+ }
+ for (String peerId : peerIds) {
+ Map<String, ReplicationGroupOffset> offsets = peerId2Offsets.get(peerId);
+ // if no replication queue for a peer, we can not delete the wal
+ if (offsets == null) {
+ return false;
+ }
+ String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(file.getPath().getName());
+ ReplicationGroupOffset offset = offsets.get(walGroupId);
+ // if a replication queue still need to replicate this wal, we can not delete it
+ if (!shouldDelete(offset, file)) {
+ return false;
+ }
+ }
+ // if all replication queues have already finished replicating this wal, we can delete it.
+ return true;
+ }
+
+ private boolean filterForDeadRegionServer(ServerName serverName, FileStatus file) {
+ Map<String, Map<String, ReplicationGroupOffset>> peerId2Offsets =
+ replicationOffsets.get(serverName);
+ if (peerId2Offsets == null) {
+ // no replication queue for this dead rs, we can delete all wal files for it
+ return true;
+ }
+ for (String peerId : peerIds) {
+ Map<String, ReplicationGroupOffset> offsets = peerId2Offsets.get(peerId);
+ if (offsets == null) {
+ // for dead server, we only care about existing replication queues, as we will delete a
+ // queue after we finish replicating it.
+ continue;
+ }
+ String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(file.getPath().getName());
+ ReplicationGroupOffset offset = offsets.get(walGroupId);
+ // if a replication queue still need to replicate this wal, we can not delete it
+ if (!shouldDelete(offset, file)) {
+ return false;
+ }
+ }
+ // if all replication queues have already finished replicating this wal, we can delete it.
+ return true;
}
@Override
@@ -78,10 +183,12 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate {
if (this.getConf() == null) {
return files;
}
-
- if (wals == null) {
+ if (!canFilter) {
+ // We can not delete anything if there are AddPeerProcedure running at the same time
+ // See HBASE-27214 for more details.
return Collections.emptyList();
}
+
return Iterables.filter(files, new Predicate<FileStatus>() {
@Override
public boolean apply(FileStatus file) {
@@ -90,65 +197,56 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate {
if (file == null) {
return false;
}
- String wal = file.getPath().getName();
- boolean logInReplicationQueue = wals.contains(wal);
- if (logInReplicationQueue) {
- LOG.debug("Found up in ZooKeeper, NOT deleting={}", wal);
+ if (peerIds.isEmpty()) {
+ // no peer, can always delete
+ return true;
+ }
+ // not a valid wal file name, delete
+ if (!AbstractFSWALProvider.validateWALFilename(file.getPath().getName())) {
+ return true;
+ }
+ // meta wal is always deletable as we will never replicate it
+ if (AbstractFSWALProvider.isMetaFile(file.getPath())) {
+ return true;
+ }
+ ServerName serverName =
+ AbstractFSWALProvider.parseServerNameFromWALName(file.getPath().getName());
+ if (notFullyDeadServers.contains(serverName)) {
+ return filterForLiveRegionServer(serverName, file);
+ } else {
+ return filterForDeadRegionServer(serverName, file);
}
- return !logInReplicationQueue && (file.getModificationTime() < readZKTimestamp);
}
});
}
+ private Set<ServerName> getNotFullyDeadServers(MasterServices services) {
+ List<ServerName> onlineServers = services.getServerManager().getOnlineServersList();
+ return Stream.concat(onlineServers.stream(),
+ services.getMasterProcedureExecutor().getProcedures().stream()
+ .filter(p -> p instanceof ServerCrashProcedure).filter(p -> !p.isFinished())
+ .map(p -> ((ServerCrashProcedure) p).getServerName()))
+ .collect(Collectors.toSet());
+ }
+
@Override
public void init(Map<String, Object> params) {
super.init(params);
- try {
- if (MapUtils.isNotEmpty(params)) {
- Object master = params.get(HMaster.MASTER);
- if (master != null && master instanceof HMaster) {
- zkw = ((HMaster) master).getZooKeeper();
- shareZK = true;
- }
- }
- if (zkw == null) {
- zkw = new ZKWatcher(getConf(), "replicationLogCleaner", null);
+ if (MapUtils.isNotEmpty(params)) {
+ Object master = params.get(HMaster.MASTER);
+ if (master != null && master instanceof MasterServices) {
+ MasterServices m = (MasterServices) master;
+ rpm = m.getReplicationPeerManager();
+ getNotFullyDeadServers = () -> getNotFullyDeadServers(m);
+ return;
}
- // TODO: revisit the implementation
- // this.queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf());
- } catch (IOException e) {
- LOG.error("Error while configuring " + this.getClass().getName(), e);
}
- }
-
- @InterfaceAudience.Private
- public void setConf(Configuration conf, ZKWatcher zk) {
- super.setConf(conf);
- try {
- this.zkw = zk;
- // TODO: revisit the implementation
- // this.queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zk, conf);
- } catch (Exception e) {
- LOG.error("Error while configuring " + this.getClass().getName(), e);
- }
- }
-
- @InterfaceAudience.Private
- public void setConf(Configuration conf, ZKWatcher zk,
- ReplicationQueueStorage replicationQueueStorage) {
- super.setConf(conf);
- this.zkw = zk;
- this.queueStorage = replicationQueueStorage;
+ throw new IllegalArgumentException("Missing " + HMaster.MASTER + " parameter");
}
@Override
public void stop(String why) {
- if (this.stopped) return;
this.stopped = true;
- if (!shareZK && this.zkw != null) {
- LOG.info("Stopping " + this.zkw);
- this.zkw.close();
- }
}
@Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleanerBarrier.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleanerBarrier.java
new file mode 100644
index 00000000000..d8756518728
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleanerBarrier.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.master;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * A barrier to guard the execution of {@link ReplicationLogCleaner}.
+ * <p/>
+ * The reason why we introduce this class is because there could be race between
+ * {@link org.apache.hadoop.hbase.master.replication.AddPeerProcedure} and
+ * {@link ReplicationLogCleaner}. See HBASE-27214 for more details.
+ */
+@InterfaceAudience.Private
+public class ReplicationLogCleanerBarrier {
+
+ private enum State {
+ // the cleaner is not running
+ NOT_RUNNING,
+ // the cleaner is running
+ RUNNING,
+ // the cleaner is disabled
+ DISABLED
+ }
+
+ private State state = State.NOT_RUNNING;
+
+ // we could have multiple AddPeerProcedure running at the same time, so here we need to do
+ // reference counting.
+ private int numberDisabled = 0;
+
+ public synchronized boolean start() {
+ if (state == State.NOT_RUNNING) {
+ state = State.RUNNING;
+ return true;
+ }
+ if (state == State.DISABLED) {
+ return false;
+ }
+ throw new IllegalStateException("Unexpected state " + state);
+ }
+
+ public synchronized void stop() {
+ if (state != State.RUNNING) {
+ throw new IllegalStateException("Unexpected state " + state);
+ }
+ state = State.NOT_RUNNING;
+ }
+
+ public synchronized boolean disable() {
+ if (state == State.RUNNING) {
+ return false;
+ }
+ if (state == State.NOT_RUNNING) {
+ state = State.DISABLED;
+ }
+ numberDisabled++;
+ return true;
+ }
+
+ public synchronized void enable() {
+ if (state != State.DISABLED) {
+ throw new IllegalStateException("Unexpected state " + state);
+ }
+ numberDisabled--;
+ if (numberDisabled == 0) {
+ state = State.NOT_RUNNING;
+ }
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 7fab12e8311..e3745a7c2e3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
+import org.apache.hadoop.hbase.replication.ReplicationOffsetUtil;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
@@ -806,22 +807,7 @@ public class ReplicationSourceManager {
if (AbstractFSWALProvider.isMetaFile(wal)) {
return false;
}
- // if no offset or the offset is just a place marker, replicate
- if (offset == null || offset == ReplicationGroupOffset.BEGIN) {
- return true;
- }
- // otherwise, compare the timestamp
- long walTs = AbstractFSWALProvider.getTimestamp(wal);
- long startWalTs = AbstractFSWALProvider.getTimestamp(offset.getWal());
- if (walTs < startWalTs) {
- return false;
- } else if (walTs > startWalTs) {
- return true;
- }
- // if the timestamp equals, usually it means we should include this wal but there is a special
- // case, a negative offset means the wal has already been fully replicated, so here we should
- // check the offset.
- return offset.getOffset() >= 0;
+ return ReplicationOffsetUtil.shouldReplicate(offset, wal);
}
void claimQueue(ReplicationQueueId queueId) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
index 50ffd6df1af..b63ad473719 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
@@ -117,7 +117,10 @@ public class ReplicationSyncUp extends Configured implements Tool {
System.out.println("Start Replication Server start");
Replication replication = new Replication();
replication.initialize(new DummyServer(zkw), fs, logDir, oldLogDir,
- new WALFactory(conf, "test", null, false));
+ new WALFactory(conf,
+ ServerName
+ .valueOf(getClass().getSimpleName() + ",16010," + EnvironmentEdgeManager.currentTime()),
+ null, false));
ReplicationSourceManager manager = replication.getReplicationManager();
manager.init();
claimReplicationQueues(zkw, manager);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
index a401bbb28c0..98fb2e780a2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
@@ -19,6 +19,9 @@ package org.apache.hadoop.hbase.wal;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
@@ -39,6 +42,7 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
@@ -632,4 +636,29 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
public static String getWALPrefixFromWALName(String name) {
return getWALNameGroupFromWALName(name, 1);
}
+
+ private static final Pattern SERVER_NAME_PATTERN = Pattern.compile("^[^"
+ + ServerName.SERVERNAME_SEPARATOR + "]+" + ServerName.SERVERNAME_SEPARATOR
+ + Addressing.VALID_PORT_REGEX + ServerName.SERVERNAME_SEPARATOR + Addressing.VALID_PORT_REGEX);
+
+ /**
+ * Parse the server name from wal prefix. A wal's name is always started with a server name in non
+ * test code.
+ * @throws IllegalArgumentException if the name passed in is not started with a server name
+ * @return the server name
+ */
+ public static ServerName parseServerNameFromWALName(String name) {
+ String decoded;
+ try {
+ decoded = URLDecoder.decode(name, StandardCharsets.UTF_8.name());
+ } catch (UnsupportedEncodingException e) {
+ throw new AssertionError("should never happen", e);
+ }
+ Matcher matcher = SERVER_NAME_PATTERN.matcher(decoded);
+ if (matcher.find()) {
+ return ServerName.valueOf(matcher.group());
+ } else {
+ throw new IllegalArgumentException(name + " is not started with a server name");
+ }
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
index 9136099defd..95565e319be 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager;
import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
@@ -170,17 +171,35 @@ public class WALFactory {
}
/**
- * @param conf must not be null, will keep a reference to read params in later reader/writer
- * instances.
- * @param factoryId a unique identifier for this factory. used i.e. by filesystem implementations
- * to make a directory
+ * Create a WALFactory.
*/
+ @RestrictedApi(explanation = "Should only be called in tests", link = "",
+ allowedOnPath = ".*/src/test/.*|.*/HBaseTestingUtility.java")
public WALFactory(Configuration conf, String factoryId) throws IOException {
// default enableSyncReplicationWALProvider is true, only disable SyncReplicationWALProvider
// for HMaster or HRegionServer which take system table only. See HBASE-19999
this(conf, factoryId, null, true);
}
+ /**
+ * Create a WALFactory.
+ * <p/>
+ * This is the constructor you should use when creating a WALFactory in normal code, to make sure
+ * that the {@code factoryId} is the server name. We need this assumption in some places for
+ * parsing the server name out from the wal file name.
+ * @param conf must not be null, will keep a reference to read params
+ * in later reader/writer instances.
+ * @param serverName use to generate the factoryId, which will be append at
+ * the first of the final file name
+ * @param abortable the server associated with this WAL file
+ * @param enableSyncReplicationWALProvider whether wrap the wal provider to a
+ * {@link SyncReplicationWALProvider} n
+ */
+ public WALFactory(Configuration conf, ServerName serverName, Abortable abortable,
+ boolean enableSyncReplicationWALProvider) throws IOException {
+ this(conf, serverName.toString(), abortable, enableSyncReplicationWALProvider);
+ }
+
/**
* @param conf must not be null, will keep a reference to read params
* in later reader/writer instances.
@@ -190,7 +209,7 @@ public class WALFactory {
* @param enableSyncReplicationWALProvider whether wrap the wal provider to a
* {@link SyncReplicationWALProvider}
*/
- public WALFactory(Configuration conf, String factoryId, Abortable abortable,
+ private WALFactory(Configuration conf, String factoryId, Abortable abortable,
boolean enableSyncReplicationWALProvider) throws IOException {
// until we've moved reader/writer construction down into providers, this initialization must
// happen prior to provider initialization, in case they need to instantiate a reader/writer.
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
index 1a0537bcbaf..d7ba6c227c6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
@@ -18,57 +18,60 @@
package org.apache.hadoop.hbase.master.cleaner;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import java.io.IOException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
+import java.util.Collections;
import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNameTestRule;
import org.apache.hadoop.hbase.Waiter;
-import org.apache.hadoop.hbase.ZooKeeperConnectionException;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.ServerManager;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
+import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
+import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
+import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
+import org.apache.hadoop.hbase.replication.ReplicationQueueId;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
-import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
+import org.apache.hadoop.hbase.replication.master.ReplicationLogCleanerBarrier;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.MockServer;
-import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
-import org.apache.zookeeper.KeeperException;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
-import org.junit.Ignore;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-// revisit later after we implement new replication log cleaner
-@Ignore
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
+
@Category({ MasterTests.class, MediumTests.class })
public class TestLogsCleaner {
@@ -88,22 +91,29 @@ public class TestLogsCleaner {
private static DirScanPool POOL;
+ private static String peerId = "1";
+
+ private MasterServices masterServices;
+
+ private ReplicationQueueStorage queueStorage;
+
+ @Rule
+ public final TableNameTestRule tableNameRule = new TableNameTestRule();
+
@BeforeClass
public static void setUpBeforeClass() throws Exception {
- TEST_UTIL.startMiniZKCluster();
- TEST_UTIL.startMiniDFSCluster(1);
+ TEST_UTIL.startMiniCluster();
POOL = DirScanPool.getLogCleanerScanPool(TEST_UTIL.getConfiguration());
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
- TEST_UTIL.shutdownMiniZKCluster();
- TEST_UTIL.shutdownMiniDFSCluster();
+ TEST_UTIL.shutdownMiniCluster();
POOL.shutdownNow();
}
@Before
- public void beforeTest() throws IOException {
+ public void beforeTest() throws Exception {
conf = TEST_UTIL.getConfiguration();
FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem();
@@ -112,14 +122,51 @@ public class TestLogsCleaner {
// root directory
fs.mkdirs(OLD_WALS_DIR);
+
+ TableName tableName = tableNameRule.getTableName();
+ TableDescriptor td = ReplicationStorageFactory.createReplicationQueueTableDescriptor(tableName);
+ TEST_UTIL.getAdmin().createTable(td);
+ TEST_UTIL.waitTableAvailable(tableName);
+ queueStorage =
+ ReplicationStorageFactory.getReplicationQueueStorage(TEST_UTIL.getConnection(), tableName);
+
+ masterServices = mock(MasterServices.class);
+ when(masterServices.getConnection()).thenReturn(TEST_UTIL.getConnection());
+ ReplicationPeerManager rpm = mock(ReplicationPeerManager.class);
+ when(masterServices.getReplicationPeerManager()).thenReturn(rpm);
+ when(rpm.getQueueStorage()).thenReturn(queueStorage);
+ when(rpm.getReplicationLogCleanerBarrier()).thenReturn(new ReplicationLogCleanerBarrier());
+ when(rpm.listPeers(null)).thenReturn(new ArrayList<>());
+ ServerManager sm = mock(ServerManager.class);
+ when(masterServices.getServerManager()).thenReturn(sm);
+ when(sm.getOnlineServersList()).thenReturn(Collections.emptyList());
+ @SuppressWarnings("unchecked")
+ ProcedureExecutor<MasterProcedureEnv> procExec = mock(ProcedureExecutor.class);
+ when(masterServices.getMasterProcedureExecutor()).thenReturn(procExec);
+ when(procExec.getProcedures()).thenReturn(Collections.emptyList());
}
/**
* This tests verifies LogCleaner works correctly with WALs and Procedure WALs located in the same
- * oldWALs directory. Created files: - 2 invalid files - 5 old Procedure WALs - 30 old WALs from
- * which 3 are in replication - 5 recent Procedure WALs - 1 recent WAL - 1 very new WAL (timestamp
- * in future) - masterProcedureWALs subdirectory Files which should stay: - 3 replication WALs - 2
- * new WALs - 5 latest Procedure WALs - masterProcedureWALs subdirectory
+ * oldWALs directory.
+ * <p/>
+ * Created files:
+ * <ul>
+ * <li>2 invalid files</li>
+ * <li>5 old Procedure WALs</li>
+ * <li>30 old WALs from which 3 are in replication</li>
+ * <li>5 recent Procedure WALs</li>
+ * <li>1 recent WAL</li>
+ * <li>1 very new WAL (timestamp in future)</li>
+ * <li>masterProcedureWALs subdirectory</li>
+ * </ul>
+ * Files which should stay:
+ * <ul>
+ * <li>3 replication WALs</li>
+ * <li>2 new WALs</li>
+ * <li>5 latest Procedure WALs</li>
+ * <li>masterProcedureWALs subdirectory</li>
+ * </ul>
*/
@Test
public void testLogCleaning() throws Exception {
@@ -131,9 +178,6 @@ public class TestLogsCleaner {
HMaster.decorateMasterConfiguration(conf);
Server server = new DummyServer();
- ReplicationQueueStorage queueStorage = ReplicationStorageFactory
- .getReplicationQueueStorage(ConnectionFactory.createConnection(conf), conf);
-
String fakeMachineName =
URLEncoder.encode(server.getServerName().toString(), StandardCharsets.UTF_8.name());
@@ -159,14 +203,12 @@ public class TestLogsCleaner {
for (int i = 1; i <= 30; i++) {
Path fileName = new Path(OLD_WALS_DIR, fakeMachineName + "." + (now - i));
fs.createNewFile(fileName);
- // Case 4: put 3 WALs in ZK indicating that they are scheduled for replication so these
- // files would pass TimeToLiveLogCleaner but would be rejected by ReplicationLogCleaner
- if (i % (30 / 3) == 0) {
- // queueStorage.addWAL(server.getServerName(), fakeMachineName, fileName.getName());
- LOG.info("Replication log file: " + fileName);
- }
}
-
+ // Case 4: the newest 3 WALs will be kept because they are beyond the replication offset
+ masterServices.getReplicationPeerManager().listPeers(null)
+ .add(new ReplicationPeerDescription(peerId, true, null, null));
+ queueStorage.setOffset(new ReplicationQueueId(server.getServerName(), peerId), fakeMachineName,
+ new ReplicationGroupOffset(fakeMachineName + "." + (now - 3), 0), Collections.emptyMap());
// Case 5: 5 Procedure WALs that are new, will stay
for (int i = 6; i <= 10; i++) {
Path fileName = new Path(OLD_PROCEDURE_WALS_DIR, String.format("pv2-%020d.log", i));
@@ -189,7 +231,8 @@ public class TestLogsCleaner {
// 10 procedure WALs
assertEquals(10, fs.listStatus(OLD_PROCEDURE_WALS_DIR).length);
- LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, OLD_WALS_DIR, POOL, null);
+ LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, OLD_WALS_DIR, POOL,
+ ImmutableMap.of(HMaster.MASTER, masterServices));
cleaner.chore();
// In oldWALs we end up with the current WAL, a newer WAL, the 3 old WALs which
@@ -208,98 +251,14 @@ public class TestLogsCleaner {
}
}
- @Test
- public void testZooKeeperRecoveryDuringGetListOfReplicators() throws Exception {
- ReplicationLogCleaner cleaner = new ReplicationLogCleaner();
-
- List<FileStatus> dummyFiles = Arrays.asList(
- new FileStatus(100, false, 3, 100, EnvironmentEdgeManager.currentTime(), new Path("log1")),
- new FileStatus(100, false, 3, 100, EnvironmentEdgeManager.currentTime(), new Path("log2")));
-
- FaultyZooKeeperWatcher faultyZK =
- new FaultyZooKeeperWatcher(conf, "testZooKeeperAbort-faulty", null);
- final AtomicBoolean getListOfReplicatorsFailed = new AtomicBoolean(false);
-
- try {
- faultyZK.init(false);
- ReplicationQueueStorage queueStorage = spy(ReplicationStorageFactory
- .getReplicationQueueStorage(ConnectionFactory.createConnection(conf), conf));
- // doAnswer(new Answer<Object>() {
- // @Override
- // public Object answer(InvocationOnMock invocation) throws Throwable {
- // try {
- // return invocation.callRealMethod();
- // } catch (ReplicationException e) {
- // LOG.debug("Caught Exception", e);
- // getListOfReplicatorsFailed.set(true);
- // throw e;
- // }
- // }
- // }).when(queueStorage).getAllWALs();
-
- cleaner.setConf(conf, faultyZK, queueStorage);
- // should keep all files due to a ConnectionLossException getting the queues znodes
- cleaner.preClean();
- Iterable<FileStatus> toDelete = cleaner.getDeletableFiles(dummyFiles);
-
- assertTrue(getListOfReplicatorsFailed.get());
- assertFalse(toDelete.iterator().hasNext());
- assertFalse(cleaner.isStopped());
-
- // zk recovery.
- faultyZK.init(true);
- cleaner.preClean();
- Iterable<FileStatus> filesToDelete = cleaner.getDeletableFiles(dummyFiles);
- Iterator<FileStatus> iter = filesToDelete.iterator();
- assertTrue(iter.hasNext());
- assertEquals(new Path("log1"), iter.next().getPath());
- assertTrue(iter.hasNext());
- assertEquals(new Path("log2"), iter.next().getPath());
- assertFalse(iter.hasNext());
-
- } finally {
- faultyZK.close();
- }
- }
-
- /**
- * When zk is working both files should be returned
- * @throws Exception from ZK watcher
- */
- @Test
- public void testZooKeeperNormal() throws Exception {
- ReplicationLogCleaner cleaner = new ReplicationLogCleaner();
-
- // Subtract 1000 from current time so modtime is for sure older
- // than 'now'.
- long modTime = EnvironmentEdgeManager.currentTime() - 1000;
- List<FileStatus> dummyFiles =
- Arrays.asList(new FileStatus(100, false, 3, 100, modTime, new Path("log1")),
- new FileStatus(100, false, 3, 100, modTime, new Path("log2")));
-
- ZKWatcher zkw = new ZKWatcher(conf, "testZooKeeperAbort-normal", null);
- try {
- cleaner.setConf(conf, zkw);
- cleaner.preClean();
- Iterable<FileStatus> filesToDelete = cleaner.getDeletableFiles(dummyFiles);
- Iterator<FileStatus> iter = filesToDelete.iterator();
- assertTrue(iter.hasNext());
- assertEquals(new Path("log1"), iter.next().getPath());
- assertTrue(iter.hasNext());
- assertEquals(new Path("log2"), iter.next().getPath());
- assertFalse(iter.hasNext());
- } finally {
- zkw.close();
- }
- }
-
@Test
public void testOnConfigurationChange() throws Exception {
// Prepare environments
Server server = new DummyServer();
FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem();
- LogCleaner cleaner = new LogCleaner(3000, server, conf, fs, OLD_WALS_DIR, POOL, null);
+ LogCleaner cleaner = new LogCleaner(3000, server, conf, fs, OLD_WALS_DIR, POOL,
+ ImmutableMap.of(HMaster.MASTER, masterServices));
int size = cleaner.getSizeOfCleaners();
assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC,
cleaner.getCleanerThreadTimeoutMsec());
@@ -338,7 +297,7 @@ public class TestLogsCleaner {
}
}
- static class DummyServer extends MockServer {
+ private static final class DummyServer extends MockServer {
@Override
public Configuration getConfiguration() {
@@ -355,26 +314,4 @@ public class TestLogsCleaner {
return null;
}
}
-
- static class FaultyZooKeeperWatcher extends ZKWatcher {
- private RecoverableZooKeeper zk;
-
- public FaultyZooKeeperWatcher(Configuration conf, String identifier, Abortable abortable)
- throws ZooKeeperConnectionException, IOException {
- super(conf, identifier, abortable);
- }
-
- public void init(boolean autoRecovery) throws Exception {
- this.zk = spy(super.getRecoverableZooKeeper());
- if (!autoRecovery) {
- doThrow(new KeeperException.ConnectionLossException()).when(zk)
- .getChildren("/hbase/replication/rs", null);
- }
- }
-
- @Override
- public RecoverableZooKeeper getRecoverableZooKeeper() {
- return zk;
- }
- }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
index 1d527f35bb0..452ad981fb1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
@@ -26,6 +26,7 @@ import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -34,7 +35,9 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
@@ -48,19 +51,19 @@ import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.MockServer;
import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
-import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-// TODO: revisit later
-@Ignore
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
+
@Category({ MasterTests.class, SmallTests.class })
public class TestReplicationHFileCleaner {
@@ -71,19 +74,25 @@ public class TestReplicationHFileCleaner {
private static final Logger LOG = LoggerFactory.getLogger(TestReplicationHFileCleaner.class);
private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
private static Server server;
+ private static final TableName tableName = TableName.valueOf("test_cleaner");
private static ReplicationQueueStorage rq;
private static ReplicationPeers rp;
private static final String peerId = "TestReplicationHFileCleaner";
private static Configuration conf = TEST_UTIL.getConfiguration();
- static FileSystem fs = null;
- Path root;
+ private static FileSystem fs = null;
+ private static Map<String, Object> params;
+ private Path root;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.startMiniCluster();
server = new DummyServer();
+ params = ImmutableMap.of(HMaster.MASTER, server);
conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
HMaster.decorateMasterConfiguration(conf);
+ TableDescriptor td = ReplicationStorageFactory.createReplicationQueueTableDescriptor(tableName);
+ TEST_UTIL.getAdmin().createTable(td);
+ conf.set(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME, tableName.getNameAsString());
rp = ReplicationFactory.getReplicationPeers(server.getZooKeeper(), conf);
rp.init();
rq = ReplicationStorageFactory.getReplicationQueueStorage(server.getConnection(), conf);
@@ -92,7 +101,7 @@ public class TestReplicationHFileCleaner {
@AfterClass
public static void tearDownAfterClass() throws Exception {
- TEST_UTIL.shutdownMiniZKCluster();
+ TEST_UTIL.shutdownMiniCluster();
}
@Before
@@ -115,6 +124,13 @@ public class TestReplicationHFileCleaner {
rp.getPeerStorage().removePeer(peerId);
}
+ private ReplicationHFileCleaner createCleaner() {
+ ReplicationHFileCleaner cleaner = new ReplicationHFileCleaner();
+ cleaner.setConf(conf);
+ cleaner.init(params);
+ return cleaner;
+ }
+
@Test
public void testIsFileDeletable() throws IOException, ReplicationException {
// 1. Create a file
@@ -122,8 +138,7 @@ public class TestReplicationHFileCleaner {
fs.createNewFile(file);
// 2. Assert file is successfully created
assertTrue("Test file not created!", fs.exists(file));
- ReplicationHFileCleaner cleaner = new ReplicationHFileCleaner();
- cleaner.setConf(conf);
+ ReplicationHFileCleaner cleaner = createCleaner();
// 3. Assert that file as is should be deletable
assertTrue("Cleaner should allow to delete this file as there is no hfile reference node "
+ "for it in the queue.", cleaner.isFileDeletable(fs.getFileStatus(file)));
@@ -160,8 +175,7 @@ public class TestReplicationHFileCleaner {
// 2. Add one file to hfile-refs queue
rq.addHFileRefs(peerId, hfiles);
- ReplicationHFileCleaner cleaner = new ReplicationHFileCleaner();
- cleaner.setConf(conf);
+ ReplicationHFileCleaner cleaner = createCleaner();
Iterator<FileStatus> deletableFilesIterator = cleaner.getDeletableFiles(files).iterator();
int i = 0;
while (deletableFilesIterator.hasNext() && i < 2) {
@@ -182,6 +196,15 @@ public class TestReplicationHFileCleaner {
return TEST_UTIL.getConfiguration();
}
+ @Override
+ public ZKWatcher getZooKeeper() {
+ try {
+ return TEST_UTIL.getZooKeeperWatcher();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
@Override
public Connection getConnection() {
try {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationOffsetUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationOffsetUtil.java
new file mode 100644
index 00000000000..f54a4958374
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationOffsetUtil.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.apache.hadoop.hbase.replication.ReplicationOffsetUtil.shouldReplicate;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ ReplicationTests.class, SmallTests.class })
+public class TestReplicationOffsetUtil {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestReplicationOffsetUtil.class);
+
+ @Test
+ public void test() {
+ assertTrue(shouldReplicate(null, "whatever"));
+ assertTrue(shouldReplicate(ReplicationGroupOffset.BEGIN, "whatever"));
+ ServerName sn = ServerName.valueOf("host", 16010, EnvironmentEdgeManager.currentTime());
+ ReplicationGroupOffset offset = new ReplicationGroupOffset(sn + ".12345", 100);
+ assertTrue(shouldReplicate(offset, sn + ".12346"));
+ assertFalse(shouldReplicate(offset, sn + ".12344"));
+ assertTrue(shouldReplicate(offset, sn + ".12345"));
+ // -1 means finish replication, so should not replicate
+ assertFalse(shouldReplicate(new ReplicationGroupOffset(sn + ".12345", -1), sn + ".12345"));
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestLogCleanerBarrier.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestLogCleanerBarrier.java
new file mode 100644
index 00000000000..06cb85523d3
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestLogCleanerBarrier.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.master;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MasterTests.class, SmallTests.class })
+public class TestLogCleanerBarrier {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestLogCleanerBarrier.class);
+
+ @Test
+ public void test() {
+ ReplicationLogCleanerBarrier barrier = new ReplicationLogCleanerBarrier();
+ assertThrows(IllegalStateException.class, () -> barrier.stop());
+ assertThrows(IllegalStateException.class, () -> barrier.enable());
+ assertTrue(barrier.start());
+ assertThrows(IllegalStateException.class, () -> barrier.start());
+ assertThrows(IllegalStateException.class, () -> barrier.enable());
+ assertFalse(barrier.disable());
+ assertThrows(IllegalStateException.class, () -> barrier.enable());
+ barrier.stop();
+
+ for (int i = 0; i < 3; i++) {
+ assertTrue(barrier.disable());
+ assertFalse(barrier.start());
+ }
+ for (int i = 0; i < 3; i++) {
+ assertFalse(barrier.start());
+ barrier.enable();
+ }
+ assertTrue(barrier.start());
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestReplicationLogCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestReplicationLogCleaner.java
new file mode 100644
index 00000000000..7a227fb0603
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestReplicationLogCleaner.java
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.master;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.emptyIterable;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.ServerManager;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
+import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
+import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
+import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
+import org.apache.hadoop.hbase.replication.ReplicationQueueData;
+import org.apache.hadoop.hbase.replication.ReplicationQueueId;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
+
+@Category({ MasterTests.class, SmallTests.class })
+public class TestReplicationLogCleaner {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestReplicationLogCleaner.class);
+
+ private static final Configuration CONF = HBaseConfiguration.create();
+
+ private MasterServices services;
+
+ private ReplicationLogCleaner cleaner;
+
+ @Before
+ public void setUp() throws ReplicationException {
+ services = mock(MasterServices.class);
+ ReplicationPeerManager rpm = mock(ReplicationPeerManager.class);
+ when(rpm.getReplicationLogCleanerBarrier()).thenReturn(new ReplicationLogCleanerBarrier());
+ when(services.getReplicationPeerManager()).thenReturn(rpm);
+ when(rpm.listPeers(null)).thenReturn(new ArrayList<>());
+ ReplicationQueueStorage rqs = mock(ReplicationQueueStorage.class);
+ when(rpm.getQueueStorage()).thenReturn(rqs);
+ when(rqs.listAllQueues()).thenReturn(new ArrayList<>());
+ ServerManager sm = mock(ServerManager.class);
+ when(services.getServerManager()).thenReturn(sm);
+ when(sm.getOnlineServersList()).thenReturn(new ArrayList<>());
+ @SuppressWarnings("unchecked")
+ ProcedureExecutor<MasterProcedureEnv> procExec = mock(ProcedureExecutor.class);
+ when(services.getMasterProcedureExecutor()).thenReturn(procExec);
+ when(procExec.getProcedures()).thenReturn(new ArrayList<>());
+
+ cleaner = new ReplicationLogCleaner();
+ cleaner.setConf(CONF);
+ Map<String, Object> params = ImmutableMap.of(HMaster.MASTER, services);
+ cleaner.init(params);
+ }
+
+ @After
+ public void tearDown() {
+ cleaner.postClean();
+ }
+
+ private static Iterable<FileStatus> runCleaner(ReplicationLogCleaner cleaner,
+ Iterable<FileStatus> files) {
+ cleaner.preClean();
+ return cleaner.getDeletableFiles(files);
+ }
+
+ private static FileStatus createFileStatus(Path path) {
+ return new FileStatus(100, false, 3, 256, EnvironmentEdgeManager.currentTime(), path);
+ }
+
+ private static FileStatus createFileStatus(ServerName sn, int number) {
+ Path path = new Path(sn.toString() + "." + number);
+ return createFileStatus(path);
+ }
+
+ private static ReplicationPeerDescription createPeer(String peerId) {
+ return new ReplicationPeerDescription(peerId, true, null, null);
+ }
+
+ private void addServer(ServerName serverName) {
+ services.getServerManager().getOnlineServersList().add(serverName);
+ }
+
+ private void addSCP(ServerName serverName, boolean finished) {
+ ServerCrashProcedure scp = mock(ServerCrashProcedure.class);
+ when(scp.getServerName()).thenReturn(serverName);
+ when(scp.isFinished()).thenReturn(finished);
+ services.getMasterProcedureExecutor().getProcedures().add(scp);
+ }
+
+ private void addPeer(String... peerIds) {
+ services.getReplicationPeerManager().listPeers(null).addAll(
+ Stream.of(peerIds).map(TestReplicationLogCleaner::createPeer).collect(Collectors.toList()));
+ }
+
+ private void addQueueData(ReplicationQueueData... datas) throws ReplicationException {
+ services.getReplicationPeerManager().getQueueStorage().listAllQueues()
+ .addAll(Arrays.asList(datas));
+ }
+
+ @Test
+ public void testNoConf() {
+ ReplicationLogCleaner cleaner = new ReplicationLogCleaner();
+ List<FileStatus> files = Arrays.asList(new FileStatus());
+ assertSame(files, runCleaner(cleaner, files));
+ cleaner.postClean();
+ }
+
+ @Test
+ public void testCanNotFilter() {
+ assertTrue(services.getReplicationPeerManager().getReplicationLogCleanerBarrier().disable());
+ List<FileStatus> files = Arrays.asList(new FileStatus());
+ assertSame(Collections.emptyList(), runCleaner(cleaner, files));
+ }
+
+ @Test
+ public void testNoPeer() {
+ Path path = new Path("/wal." + EnvironmentEdgeManager.currentTime());
+ assertTrue(AbstractFSWALProvider.validateWALFilename(path.getName()));
+ FileStatus file = createFileStatus(path);
+ Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator();
+ assertSame(file, iter.next());
+ assertFalse(iter.hasNext());
+ }
+
+ @Test
+ public void testNotValidWalFile() {
+ addPeer("1");
+ Path path = new Path("/whatever");
+ assertFalse(AbstractFSWALProvider.validateWALFilename(path.getName()));
+ FileStatus file = createFileStatus(path);
+ Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator();
+ assertSame(file, iter.next());
+ assertFalse(iter.hasNext());
+ }
+
+ @Test
+ public void testMetaWalFile() {
+ addPeer("1");
+ Path path = new Path(
+ "/wal." + EnvironmentEdgeManager.currentTime() + AbstractFSWALProvider.META_WAL_PROVIDER_ID);
+ assertTrue(AbstractFSWALProvider.validateWALFilename(path.getName()));
+ assertTrue(AbstractFSWALProvider.isMetaFile(path));
+ FileStatus file = createFileStatus(path);
+ Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator();
+ assertSame(file, iter.next());
+ assertFalse(iter.hasNext());
+ }
+
+ @Test
+ public void testLiveRegionServerNoQueues() {
+ addPeer("1");
+ ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
+ addServer(sn);
+ List<FileStatus> files = Arrays.asList(createFileStatus(sn, 1));
+ assertThat(runCleaner(cleaner, files), emptyIterable());
+ }
+
+ @Test
+ public void testLiveRegionServerWithSCPNoQueues() {
+ addPeer("1");
+ ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
+ addSCP(sn, false);
+ List<FileStatus> files = Arrays.asList(createFileStatus(sn, 1));
+ assertThat(runCleaner(cleaner, files), emptyIterable());
+ }
+
+ @Test
+ public void testDeadRegionServerNoQueues() {
+ addPeer("1");
+ ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
+ FileStatus file = createFileStatus(sn, 1);
+ Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator();
+ assertSame(file, iter.next());
+ assertFalse(iter.hasNext());
+ }
+
+ @Test
+ public void testDeadRegionServerWithSCPNoQueues() {
+ addPeer("1");
+ ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
+ addSCP(sn, true);
+ FileStatus file = createFileStatus(sn, 1);
+ Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator();
+ assertSame(file, iter.next());
+ assertFalse(iter.hasNext());
+ }
+
+ @Test
+ public void testLiveRegionServerMissingQueue() throws ReplicationException {
+ String peerId1 = "1";
+ String peerId2 = "2";
+ addPeer(peerId1, peerId2);
+ ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
+ addServer(sn);
+ FileStatus file = createFileStatus(sn, 1);
+ ReplicationQueueData data1 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId1),
+ ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1)));
+ addQueueData(data1);
+ assertThat(runCleaner(cleaner, Arrays.asList(file)), emptyIterable());
+ }
+
+ @Test
+ public void testLiveRegionServerShouldNotDelete() throws ReplicationException {
+ String peerId = "1";
+ addPeer(peerId);
+ ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
+ addServer(sn);
+ FileStatus file = createFileStatus(sn, 1);
+ ReplicationQueueData data = new ReplicationQueueData(new ReplicationQueueId(sn, peerId),
+ ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), 0)));
+ addQueueData(data);
+ assertThat(runCleaner(cleaner, Arrays.asList(file)), emptyIterable());
+ }
+
+ @Test
+ public void testLiveRegionServerShouldNotDeleteTwoPeers() throws ReplicationException {
+ String peerId1 = "1";
+ String peerId2 = "2";
+ addPeer(peerId1, peerId2);
+ ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
+ addServer(sn);
+ FileStatus file = createFileStatus(sn, 1);
+ ReplicationQueueData data1 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId1),
+ ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1)));
+ ReplicationQueueData data2 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId2),
+ ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), 0)));
+ addQueueData(data1, data2);
+ assertThat(runCleaner(cleaner, Arrays.asList(file)), emptyIterable());
+ }
+
+ @Test
+ public void testLiveRegionServerShouldDelete() throws ReplicationException {
+ String peerId = "1";
+ addPeer(peerId);
+ ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
+ addServer(sn);
+ FileStatus file = createFileStatus(sn, 1);
+ ReplicationQueueData data = new ReplicationQueueData(new ReplicationQueueId(sn, peerId),
+ ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1)));
+ services.getReplicationPeerManager().getQueueStorage().listAllQueues().add(data);
+ Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator();
+ assertSame(file, iter.next());
+ assertFalse(iter.hasNext());
+ }
+
+ @Test
+ public void testLiveRegionServerShouldDeleteTwoPeers() throws ReplicationException {
+ String peerId1 = "1";
+ String peerId2 = "2";
+ addPeer(peerId1, peerId2);
+ ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
+ addServer(sn);
+ FileStatus file = createFileStatus(sn, 1);
+ ReplicationQueueData data1 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId1),
+ ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1)));
+ ReplicationQueueData data2 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId2),
+ ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1)));
+ addQueueData(data1, data2);
+ Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator();
+ assertSame(file, iter.next());
+ assertFalse(iter.hasNext());
+ }
+
+ @Test
+ public void testDeadRegionServerMissingQueue() throws ReplicationException {
+ String peerId1 = "1";
+ String peerId2 = "2";
+ addPeer(peerId1, peerId2);
+ ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
+ FileStatus file = createFileStatus(sn, 1);
+ ReplicationQueueData data1 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId1),
+ ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1)));
+ addQueueData(data1);
+ Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator();
+ assertSame(file, iter.next());
+ assertFalse(iter.hasNext());
+ }
+
+ @Test
+ public void testDeadRegionServerShouldNotDelete() throws ReplicationException {
+ String peerId = "1";
+ addPeer(peerId);
+ ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
+ FileStatus file = createFileStatus(sn, 1);
+ ReplicationQueueData data = new ReplicationQueueData(new ReplicationQueueId(sn, peerId),
+ ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), 0)));
+ addQueueData(data);
+ assertThat(runCleaner(cleaner, Arrays.asList(file)), emptyIterable());
+ }
+
+ @Test
+ public void testDeadRegionServerShouldNotDeleteTwoPeers() throws ReplicationException {
+ String peerId1 = "1";
+ String peerId2 = "2";
+ addPeer(peerId1, peerId2);
+ ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
+ FileStatus file = createFileStatus(sn, 1);
+ ReplicationQueueData data1 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId1),
+ ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1)));
+ ReplicationQueueData data2 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId2),
+ ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), 0)));
+ addQueueData(data1, data2);
+ assertThat(runCleaner(cleaner, Arrays.asList(file)), emptyIterable());
+ }
+
+ @Test
+ public void testDeadRegionServerShouldDelete() throws ReplicationException {
+ String peerId = "1";
+ addPeer(peerId);
+ ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
+ FileStatus file = createFileStatus(sn, 1);
+ ReplicationQueueData data = new ReplicationQueueData(new ReplicationQueueId(sn, peerId),
+ ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1)));
+ services.getReplicationPeerManager().getQueueStorage().listAllQueues().add(data);
+ Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator();
+ assertSame(file, iter.next());
+ assertFalse(iter.hasNext());
+ }
+
+ @Test
+ public void testDeadRegionServerShouldDeleteTwoPeers() throws ReplicationException {
+ String peerId1 = "1";
+ String peerId2 = "2";
+ addPeer(peerId1, peerId2);
+ ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
+ FileStatus file = createFileStatus(sn, 1);
+ ReplicationQueueData data1 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId1),
+ ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1)));
+ ReplicationQueueData data2 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId2),
+ ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1)));
+ addQueueData(data1, data2);
+ Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator();
+ assertSame(file, iter.next());
+ assertFalse(iter.hasNext());
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 9496afb780c..669364c278f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -190,7 +190,7 @@ public class TestReplicationSourceManager {
replication = new Replication();
replication.initialize(server, FS, logDir, oldLogDir,
- new WALFactory(CONF, "test", null, false));
+ new WALFactory(CONF, server.getServerName(), null, false));
manager = replication.getReplicationManager();
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
index f2f73c37c6f..e6a2cca4246 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
@@ -671,7 +671,7 @@ public class TestWALFactory {
assertEquals(wrappedWALProvider.getClass(), walFactory.getMetaProvider().getClass());
// if providers are not set and do not enable SyncReplicationWALProvider
- walFactory = new WALFactory(conf, this.currentServername.toString(), null, false);
+ walFactory = new WALFactory(conf, this.currentServername, null, false);
assertEquals(walFactory.getWALProvider().getClass(), walFactory.getMetaProvider().getClass());
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java
index 8273b3d6041..6a1e98d9fd5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
@@ -183,4 +184,17 @@ public class TestWALMethods {
return entry;
}
+ @Test
+ public void testParseServerNameFromWALName() {
+ assertEquals(ServerName.valueOf("abc,123,123"),
+ AbstractFSWALProvider.parseServerNameFromWALName("abc,123,123.1.12345.meta"));
+ assertEquals(ServerName.valueOf("abc,123,123"),
+ AbstractFSWALProvider.parseServerNameFromWALName("abc,123,123.12345"));
+ assertEquals(ServerName.valueOf("abc,123,123"),
+ AbstractFSWALProvider.parseServerNameFromWALName("abc,123,123"));
+ assertThrows(IllegalArgumentException.class,
+ () -> AbstractFSWALProvider.parseServerNameFromWALName("test,abc,123,123.12345"));
+ assertThrows(IllegalArgumentException.class,
+ () -> AbstractFSWALProvider.parseServerNameFromWALName("abc"));
+ }
}