You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by su...@apache.org on 2021/02/23 01:57:43 UTC
[hbase] 03/10: HBASE-24735: Refactor ReplicationSourceManager: move
logPositionAndCleanOldLogs/cleanUpHFileRefs to ReplicationSource inside
(#2064)
This is an automated email from the ASF dual-hosted git repository.
sunxin pushed a commit to branch HBASE-24666
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit d4bcf8d9357e3b4e9984255742505649d892681b
Author: Guanghao Zhang <zg...@apache.org>
AuthorDate: Tue Aug 11 20:07:09 2020 +0800
HBASE-24735: Refactor ReplicationSourceManager: move logPositionAndCleanOldLogs/cleanUpHFileRefs to ReplicationSource inside (#2064)
Signed-off-by: Wellington Chevreuil <wc...@apache.org>
---
.../regionserver/CatalogReplicationSource.java | 13 +-
.../regionserver/RecoveredReplicationSource.java | 18 ++-
.../regionserver/ReplicationSource.java | 166 ++++++++++++++++++---
.../regionserver/ReplicationSourceInterface.java | 39 +++--
.../regionserver/ReplicationSourceManager.java | 147 ++----------------
.../regionserver/ReplicationSourceShipper.java | 21 +--
.../regionserver/ReplicationSourceWALReader.java | 16 +-
.../replication/regionserver/WALEntryBatch.java | 2 +-
.../hbase/replication/ReplicationSourceDummy.java | 24 +--
.../regionserver/TestReplicationSource.java | 16 +-
.../regionserver/TestReplicationSourceManager.java | 50 ++++---
.../regionserver/TestWALEntryStream.java | 20 ++-
12 files changed, 276 insertions(+), 256 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicationSource.java
index 8cb7860..15370e0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicationSource.java
@@ -35,7 +35,18 @@ class CatalogReplicationSource extends ReplicationSource {
}
@Override
- public void logPositionAndCleanOldLogs(WALEntryBatch entryBatch) {
+ public void setWALPosition(WALEntryBatch entryBatch) {
+ // Noop. This CatalogReplicationSource implementation does not persist state to backing storage
+ // nor does it keep its WALs in a general map up in ReplicationSourceManager --
+ // CatalogReplicationSource is used by the Catalog Read Replica feature which resets everytime
+ // the WAL source process crashes. Skip calling through to the default implementation.
+ // See "4.1 Skip maintaining zookeeper replication queue (offsets/WALs)" in the
+ // design doc attached to HBASE-18070 'Enable memstore replication for meta replica for detail'
+ // for background on why no need to keep WAL state.
+ }
+
+ @Override
+ public void cleanOldWALs(String log, boolean inclusive) {
// Noop. This CatalogReplicationSource implementation does not persist state to backing storage
// nor does it keep its WALs in a general map up in ReplicationSourceManager --
// CatalogReplicationSource is used by the Catalog Read Replica feature which resets everytime
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
index 526c3e3..abbc046 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.PriorityBlockingQueue;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -44,15 +45,18 @@ public class RecoveredReplicationSource extends ReplicationSource {
private static final Logger LOG = LoggerFactory.getLogger(RecoveredReplicationSource.class);
+ private Path walDir;
+
private String actualPeerId;
@Override
- public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
- ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
- String peerClusterZnode, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
- MetricsSource metrics) throws IOException {
- super.init(conf, fs, manager, queueStorage, replicationPeer, server, peerClusterZnode,
+ public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager,
+ ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
+ String peerClusterZnode, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
+ MetricsSource metrics) throws IOException {
+ super.init(conf, fs, walDir, manager, queueStorage, replicationPeer, server, peerClusterZnode,
clusterId, walFileLengthProvider, metrics);
+ this.walDir = walDir;
this.actualPeerId = this.replicationQueueInfo.getPeerId();
}
@@ -93,7 +97,7 @@ public class RecoveredReplicationSource extends ReplicationSource {
deadRsDirectory.suffix(AbstractFSWALProvider.SPLITTING_EXT), path.getName()) };
for (Path possibleLogLocation : locs) {
LOG.info("Possible location " + possibleLogLocation.toUri().toString());
- if (manager.getFs().exists(possibleLogLocation)) {
+ if (this.fs.exists(possibleLogLocation)) {
// We found the right new location
LOG.info("Log " + path + " still exists at " + possibleLogLocation);
newPaths.add(possibleLogLocation);
@@ -126,7 +130,7 @@ public class RecoveredReplicationSource extends ReplicationSource {
// N.B. the ReplicationSyncUp tool sets the manager.getWALDir to the root of the wal
// area rather than to the wal area for a particular region server.
private Path getReplSyncUpPath(Path path) throws IOException {
- FileStatus[] rss = fs.listStatus(manager.getLogDir());
+ FileStatus[] rss = fs.listStatus(walDir);
for (FileStatus rs : rss) {
Path p = rs.getPath();
FileStatus[] logs = fs.listStatus(p);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index fdf7d89..a592a79 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -26,7 +26,9 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.NavigableSet;
import java.util.TreeMap;
+import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@@ -34,6 +36,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
+import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
@@ -50,15 +53,19 @@ import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
import org.apache.hadoop.hbase.replication.ClusterMarkingEntryFilter;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -89,7 +96,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
protected ReplicationQueueInfo replicationQueueInfo;
// The manager of all sources to which we ping back our progress
- protected ReplicationSourceManager manager;
+ ReplicationSourceManager manager;
// Should we stop everything?
protected Server server;
// How long should we sleep for each retry
@@ -133,8 +140,6 @@ public class ReplicationSource implements ReplicationSourceInterface {
protected final ConcurrentHashMap<String, ReplicationSourceShipper> workerThreads =
new ConcurrentHashMap<>();
- private AtomicLong totalBufferUsed;
-
public static final String WAIT_ON_ENDPOINT_SECONDS =
"hbase.replication.wait.on.endpoint.seconds";
public static final int DEFAULT_WAIT_ON_ENDPOINT_SECONDS = 30;
@@ -186,7 +191,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
* @param metrics metrics for replication source
*/
@Override
- public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
+ public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager,
ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
MetricsSource metrics) throws IOException {
@@ -215,7 +220,6 @@ public class ReplicationSource implements ReplicationSourceInterface {
defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
currentBandwidth = getCurrentBandwidth();
this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0);
- this.totalBufferUsed = manager.getTotalBufferUsed();
this.walFileLengthProvider = walFileLengthProvider;
this.abortOnError = this.conf.getBoolean("replication.source.regionserver.abort",
@@ -386,11 +390,11 @@ public class ReplicationSource implements ReplicationSourceInterface {
}
private ReplicationSourceWALReader createNewWALReader(String walGroupId, long startPosition) {
- return replicationPeer.getPeerConfig().isSerial()
- ? new SerialReplicationSourceWALReader(fs, conf, logQueue, startPosition, walEntryFilter,
- this, walGroupId)
- : new ReplicationSourceWALReader(fs, conf, logQueue, startPosition, walEntryFilter,
- this, walGroupId);
+ return replicationPeer.getPeerConfig().isSerial() ?
+ new SerialReplicationSourceWALReader(fs, conf, logQueue, startPosition, walEntryFilter,
+ this, walGroupId) :
+ new ReplicationSourceWALReader(fs, conf, logQueue, startPosition, walEntryFilter,
+ this, walGroupId);
}
/**
@@ -416,7 +420,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
t.getName());
manager.refreshSources(peerId);
break;
- } catch (IOException e1) {
+ } catch (ReplicationException | IOException e1) {
LOG.error("Replication sources refresh failed.", e1);
sleepForRetries("Sleeping before try refreshing sources again",
maxRetriesMultiplier);
@@ -431,11 +435,6 @@ public class ReplicationSource implements ReplicationSourceInterface {
}
@Override
- public ReplicationSourceManager getSourceManager() {
- return this.manager;
- }
-
- @Override
public void tryThrottle(int batchSize) throws InterruptedException {
checkBandwidthChangeAndResetThrottler();
if (throttler.isEnabled()) {
@@ -752,7 +751,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
throttler.addPushSize(batchSize);
}
totalReplicatedEdits.addAndGet(entries.size());
- long newBufferUsed = totalBufferUsed.addAndGet(-batchSize);
+ long newBufferUsed = manager.getTotalBufferUsed().addAndGet(-batchSize);
// Record the new buffer usage
this.manager.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
}
@@ -788,4 +787,137 @@ public class ReplicationSource implements ReplicationSourceInterface {
public String logPeerId(){
return "peerId=" + this.getPeerId() + ",";
}
+
+ @Override
+ public void setWALPosition(WALEntryBatch entryBatch) {
+ String fileName = entryBatch.getLastWalPath().getName();
+ interruptOrAbortWhenFail(() -> this.queueStorage
+ .setWALPosition(server.getServerName(), getQueueId(), fileName,
+ entryBatch.getLastWalPosition(), entryBatch.getLastSeqIds()));
+ }
+
+ @Override
+ public void cleanOldWALs(String log, boolean inclusive) {
+ NavigableSet<String> walsToRemove = getWalsToRemove(log, inclusive);
+ if (walsToRemove.isEmpty()) {
+ return;
+ }
+ // cleanOldWALs may spend some time, especially for sync replication where we may want to
+ // remove remote wals as the remote cluster may have already been down, so we do it outside
+ // the lock to avoid block preLogRoll
+ cleanOldWALs(walsToRemove);
+ }
+
+ private NavigableSet<String> getWalsToRemove(String log, boolean inclusive) {
+ NavigableSet<String> walsToRemove = new TreeSet<>();
+ String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log);
+ try {
+ this.queueStorage.getWALsInQueue(this.server.getServerName(), getQueueId()).forEach(wal -> {
+ LOG.debug("getWalsToRemove wal {}", wal);
+ String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
+ if (walPrefix.equals(logPrefix)) {
+ walsToRemove.add(wal);
+ }
+ });
+ } catch (ReplicationException e) {
+ // Just log the exception here, as the recovered replication source will try to cleanup again.
+ LOG.warn("Failed to read wals in queue {}", getQueueId(), e);
+ }
+ return walsToRemove.headSet(log, inclusive);
+ }
+
+ private void removeRemoteWALs(String peerId, String remoteWALDir, Collection<String> wals)
+ throws IOException {
+ Path remoteWALDirForPeer = ReplicationUtils.getPeerRemoteWALDir(remoteWALDir, peerId);
+ FileSystem fs = ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir);
+ for (String wal : wals) {
+ Path walFile = new Path(remoteWALDirForPeer, wal);
+ try {
+ if (!fs.delete(walFile, false) && fs.exists(walFile)) {
+ throw new IOException("Can not delete " + walFile);
+ }
+ } catch (FileNotFoundException e) {
+ // Just ignore since this means the file has already been deleted.
+ // The javadoc of the FileSystem.delete methods does not specify the behavior of deleting an
+ // inexistent file, so here we deal with both, i.e, check the return value of the
+ // FileSystem.delete, and also catch FNFE.
+ LOG.debug("The remote wal {} has already been deleted?", walFile, e);
+ }
+ }
+ }
+
+ private void cleanOldWALs(NavigableSet<String> wals) {
+ LOG.debug("Removing {} logs in the list: {}", wals.size(), wals);
+ // The intention here is that, we want to delete the remote wal files ASAP as it may effect the
+ // failover time if you want to transit the remote cluster from S to A. And the infinite retry
+ // is not a problem, as if we can not contact with the remote HDFS cluster, then usually we can
+ // not contact with the HBase cluster either, so the replication will be blocked either.
+ if (isSyncReplication()) {
+ String peerId = getPeerId();
+ String remoteWALDir = replicationPeer.getPeerConfig().getRemoteWALDir();
+ // Filter out the wals need to be removed from the remote directory. Its name should be the
+ // special format, and also, the peer id in its name should match the peer id for the
+ // replication source.
+ List<String> remoteWals = wals.stream().filter(w -> SyncReplicationWALProvider
+ .getSyncReplicationPeerIdFromWALName(w).map(peerId::equals).orElse(false))
+ .collect(Collectors.toList());
+ LOG.debug("Removing {} logs from remote dir {} in the list: {}", remoteWals.size(),
+ remoteWALDir, remoteWals);
+ if (!remoteWals.isEmpty()) {
+ for (int sleepMultiplier = 0;;) {
+ try {
+ removeRemoteWALs(peerId, remoteWALDir, remoteWals);
+ break;
+ } catch (IOException e) {
+ LOG.warn("Failed to delete remote wals from remote dir {} for peer {}", remoteWALDir,
+ peerId);
+ }
+ if (!isSourceActive()) {
+ // skip the following operations
+ return;
+ }
+ if (ReplicationUtils.sleepForRetries("Failed to delete remote wals", sleepForRetries,
+ sleepMultiplier, maxRetriesMultiplier)) {
+ sleepMultiplier++;
+ }
+ }
+ }
+ }
+ for (String wal : wals) {
+ interruptOrAbortWhenFail(
+ () -> this.queueStorage.removeWAL(server.getServerName(), getQueueId(), wal));
+ }
+ }
+
+ public void cleanUpHFileRefs(List<String> files) {
+ interruptOrAbortWhenFail(() -> this.queueStorage.removeHFileRefs(getPeerId(), files));
+ }
+
+ @FunctionalInterface
+ private interface ReplicationQueueOperation {
+ void exec() throws ReplicationException;
+ }
+
+ /**
+ * Refresh replication source will terminate the old source first, then the source thread will be
+ * interrupted. Need to handle it instead of abort the region server.
+ */
+ private void interruptOrAbortWhenFail(ReplicationQueueOperation op) {
+ try {
+ op.exec();
+ } catch (ReplicationException e) {
+ if (e.getCause() != null && e.getCause() instanceof KeeperException.SystemErrorException
+ && e.getCause().getCause() != null && e.getCause()
+ .getCause() instanceof InterruptedException) {
+ // ReplicationRuntimeException(a RuntimeException) is thrown out here. The reason is
+ // that thread is interrupted deep down in the stack, it should pass the following
+ // processing logic and propagate to the most top layer which can handle this exception
+ // properly. In this specific case, the top layer is ReplicationSourceShipper#run().
+ throw new ReplicationRuntimeException(
+ "Thread is interrupted, the replication source may be terminated",
+ e.getCause().getCause());
+ }
+ server.abort("Failed to operate on replication queue", e);
+ }
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index 352cdd3..77bba90 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -41,15 +41,15 @@ import org.apache.yetus.audience.InterfaceAudience;
public interface ReplicationSourceInterface {
/**
* Initializer for the source
- * @param conf the configuration to use
- * @param fs the file system to use
- * @param manager the manager to use
+ *
+ * @param conf the configuration to use
+ * @param fs the file system to use
* @param server the server for this region server
*/
- void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
- ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
- String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
- MetricsSource metrics) throws IOException;
+ void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager,
+ ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
+ String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
+ MetricsSource metrics) throws IOException;
/**
* Add a log to the list of logs to replicate
@@ -146,11 +146,6 @@ public interface ReplicationSourceInterface {
ReplicationEndpoint getReplicationEndpoint();
/**
- * @return the replication source manager
- */
- ReplicationSourceManager getSourceManager();
-
- /**
* @return the wal file length provider
*/
WALFileLengthProvider getWALFileLengthProvider();
@@ -196,14 +191,16 @@ public interface ReplicationSourceInterface {
ReplicationQueueStorage getReplicationQueueStorage();
/**
- * Log the current position to storage. Also clean old logs from the replication queue.
- * Use to bypass the default call to
- * {@link ReplicationSourceManager#logPositionAndCleanOldLogs(ReplicationSourceInterface,
- * WALEntryBatch)} whem implementation does not need to persist state to backing storage.
- * @param entryBatch the wal entry batch we just shipped
- * @return The instance of queueStorage used by this ReplicationSource.
+ * Set the current position of WAL to {@link ReplicationQueueStorage}
+ * @param entryBatch a batch of WAL entries to replicate
*/
- default void logPositionAndCleanOldLogs(WALEntryBatch entryBatch) {
- getSourceManager().logPositionAndCleanOldLogs(this, entryBatch);
- }
+ void setWALPosition(WALEntryBatch entryBatch);
+
+ /**
+ * Cleans a WAL and all older WALs from replication queue. Called when we are sure that a WAL is
+ * closed and has no more entries.
+ * @param walName the name of WAL
+ * @param inclusive whether we should also remove the given WAL
+ */
+ void cleanOldWALs(String walName, boolean inclusive);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 00ee6a5..717b1ff 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -17,10 +17,8 @@
*/
package org.apache.hadoop.hbase.replication.regionserver;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
@@ -42,7 +40,6 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -64,17 +61,14 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
-import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
-import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -339,7 +333,7 @@ public class ReplicationSourceManager implements ReplicationListener {
WALFileLengthProvider walFileLengthProvider =
this.walFactory.getWALProvider() != null?
this.walFactory.getWALProvider().getWALFileLengthProvider() : p -> OptionalLong.empty();
- src.init(conf, fs, this, queueStorage, replicationPeer, server, queueId, clusterId,
+ src.init(conf, fs, logDir, this, queueStorage, replicationPeer, server, queueId, clusterId,
walFileLengthProvider, new MetricsSource(queueId));
return src;
}
@@ -542,29 +536,6 @@ public class ReplicationSourceManager implements ReplicationListener {
void exec() throws ReplicationException;
}
- /**
- * Refresh replication source will terminate the old source first, then the source thread will be
- * interrupted. Need to handle it instead of abort the region server.
- */
- private void interruptOrAbortWhenFail(ReplicationQueueOperation op) {
- try {
- op.exec();
- } catch (ReplicationException e) {
- if (e.getCause() != null && e.getCause() instanceof KeeperException.SystemErrorException
- && e.getCause().getCause() != null && e.getCause()
- .getCause() instanceof InterruptedException) {
- // ReplicationRuntimeException(a RuntimeException) is thrown out here. The reason is
- // that thread is interrupted deep down in the stack, it should pass the following
- // processing logic and propagate to the most top layer which can handle this exception
- // properly. In this specific case, the top layer is ReplicationSourceShipper#run().
- throw new ReplicationRuntimeException(
- "Thread is interrupted, the replication source may be terminated",
- e.getCause().getCause());
- }
- server.abort("Failed to operate on replication queue", e);
- }
- }
-
private void abortWhenFail(ReplicationQueueOperation op) {
try {
op.exec();
@@ -590,106 +561,6 @@ public class ReplicationSourceManager implements ReplicationListener {
}
}
- /**
- * This method will log the current position to storage. And also clean old logs from the
- * replication queue.
- * @param source the replication source
- * @param entryBatch the wal entry batch we just shipped
- */
- public void logPositionAndCleanOldLogs(ReplicationSourceInterface source,
- WALEntryBatch entryBatch) {
- String fileName = entryBatch.getLastWalPath().getName();
- interruptOrAbortWhenFail(() -> this.queueStorage.setWALPosition(server.getServerName(),
- source.getQueueId(), fileName, entryBatch.getLastWalPosition(), entryBatch.getLastSeqIds()));
- cleanOldLogs(fileName, entryBatch.isEndOfFile(), source);
- }
-
- /**
- * Cleans a log file and all older logs from replication queue. Called when we are sure that a log
- * file is closed and has no more entries.
- * @param log Path to the log
- * @param inclusive whether we should also remove the given log file
- * @param source the replication source
- */
- void cleanOldLogs(String log, boolean inclusive,
- ReplicationSourceInterface source) {
- NavigableSet<String> walsToRemove;
- synchronized (this.latestPaths) {
- walsToRemove = getWalsToRemove(source.getQueueId(), log, inclusive);
- }
- if (walsToRemove.isEmpty()) {
- return;
- }
- // cleanOldLogs may spend some time, especially for sync replication where we may want to
- // remove remote wals as the remote cluster may have already been down, so we do it outside
- // the lock to avoid block preLogRoll
- cleanOldLogs(walsToRemove, source);
- }
-
- private void removeRemoteWALs(String peerId, String remoteWALDir, Collection<String> wals)
- throws IOException {
- Path remoteWALDirForPeer = ReplicationUtils.getPeerRemoteWALDir(remoteWALDir, peerId);
- FileSystem fs = ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir);
- for (String wal : wals) {
- Path walFile = new Path(remoteWALDirForPeer, wal);
- try {
- if (!fs.delete(walFile, false) && fs.exists(walFile)) {
- throw new IOException("Can not delete " + walFile);
- }
- } catch (FileNotFoundException e) {
- // Just ignore since this means the file has already been deleted.
- // The javadoc of the FileSystem.delete methods does not specify the behavior of deleting an
- // inexistent file, so here we deal with both, i.e, check the return value of the
- // FileSystem.delete, and also catch FNFE.
- LOG.debug("The remote wal {} has already been deleted?", walFile, e);
- }
- }
- }
-
- private void cleanOldLogs(NavigableSet<String> wals, ReplicationSourceInterface source) {
- LOG.debug("Removing {} logs in the list: {}", wals.size(), wals);
- // The intention here is that, we want to delete the remote wal files ASAP as it may effect the
- // failover time if you want to transit the remote cluster from S to A. And the infinite retry
- // is not a problem, as if we can not contact with the remote HDFS cluster, then usually we can
- // not contact with the HBase cluster either, so the replication will be blocked either.
- if (source.isSyncReplication()) {
- String peerId = source.getPeerId();
- String remoteWALDir = source.getPeer().getPeerConfig().getRemoteWALDir();
- // Filter out the wals need to be removed from the remote directory. Its name should be the
- // special format, and also, the peer id in its name should match the peer id for the
- // replication source.
- List<String> remoteWals = wals.stream().filter(w -> SyncReplicationWALProvider
- .getSyncReplicationPeerIdFromWALName(w).map(peerId::equals).orElse(false))
- .collect(Collectors.toList());
- LOG.debug("Removing {} logs from remote dir {} in the list: {}", remoteWals.size(),
- remoteWALDir, remoteWals);
- if (!remoteWals.isEmpty()) {
- for (int sleepMultiplier = 0;;) {
- try {
- removeRemoteWALs(peerId, remoteWALDir, remoteWals);
- break;
- } catch (IOException e) {
- LOG.warn("Failed to delete remote wals from remote dir {} for peer {}", remoteWALDir,
- peerId);
- }
- if (!source.isSourceActive()) {
- // skip the following operations
- return;
- }
- if (ReplicationUtils.sleepForRetries("Failed to delete remote wals", sleepForRetries,
- sleepMultiplier, maxRetriesMultiplier)) {
- sleepMultiplier++;
- }
- }
- }
- }
- String queueId = source.getQueueId();
- for (String wal : wals) {
- interruptOrAbortWhenFail(
- () -> this.queueStorage.removeWAL(server.getServerName(), queueId, wal));
- }
- }
-
// public because of we call it in TestReplicationEmptyWALRecovery
public void preLogRoll(Path newLog) throws IOException {
String logName = newLog.getName();
@@ -1098,10 +969,6 @@ public class ReplicationSourceManager implements ReplicationListener {
}
}
- public void cleanUpHFileRefs(String peerId, List<String> files) {
- interruptOrAbortWhenFail(() -> this.queueStorage.removeHFileRefs(peerId, files));
- }
-
int activeFailoverTaskCount() {
return executor.getActiveCount();
}
@@ -1110,6 +977,16 @@ public class ReplicationSourceManager implements ReplicationListener {
return this.globalMetrics;
}
+ @InterfaceAudience.Private
+ Server getServer() {
+ return this.server;
+ }
+
+ @InterfaceAudience.Private
+ ReplicationQueueStorage getQueueStorage() {
+ return this.queueStorage;
+ }
+
/**
* Add an hbase:meta Catalog replication source. Called on open of an hbase:meta Region.
* Create it once only. If exists already, use the existing one.
@@ -1161,7 +1038,7 @@ public class ReplicationSourceManager implements ReplicationListener {
CatalogReplicationSourcePeer peer = new CatalogReplicationSourcePeer(this.conf,
this.clusterId.toString());
final ReplicationSourceInterface crs = new CatalogReplicationSource();
- crs.init(conf, fs, this, new NoopReplicationQueueStorage(), peer, server, peer.getId(),
+ crs.init(conf, fs, logDir, this, new NoopReplicationQueueStorage(), peer, server, peer.getId(),
clusterId, walProvider.getWALFileLengthProvider(), new MetricsSource(peer.getId()));
// Add listener on the provider so we can pick up the WAL to replicate on roll.
WALActionsListener listener = new WALActionsListener() {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
index 098ba02..16f4994 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -239,12 +239,6 @@ public class ReplicationSourceShipper extends Thread {
}
private void cleanUpHFileRefs(WALEdit edit) throws IOException {
- String peerId = source.getPeerId();
- if (peerId.contains("-")) {
- // peerClusterZnode will be in the form peerId + "-" + rsZNode.
- // A peerId will not have "-" in its name, see HBASE-11394
- peerId = peerId.split("-")[0];
- }
List<Cell> cells = edit.getCells();
int totalCells = cells.size();
for (int i = 0; i < totalCells; i++) {
@@ -255,7 +249,7 @@ public class ReplicationSourceShipper extends Thread {
int totalStores = stores.size();
for (int j = 0; j < totalStores; j++) {
List<String> storeFileList = stores.get(j).getStoreFileList();
- source.getSourceManager().cleanUpHFileRefs(peerId, storeFileList);
+ source.cleanUpHFileRefs(storeFileList);
source.getSourceMetrics().decrSizeOfHFileRefsQueue(storeFileList.size());
}
}
@@ -267,10 +261,11 @@ public class ReplicationSourceShipper extends Thread {
// if end of file is true, then the logPositionAndCleanOldLogs method will remove the file
// record on zk, so let's call it. The last wal position maybe zero if end of file is true and
// there is no entry in the batch. It is OK because that the queue storage will ignore the zero
- // position and the file will be removed soon in cleanOldLogs.
- if (batch.isEndOfFile() || !batch.getLastWalPath().equals(currentPath) ||
- batch.getLastWalPosition() != currentPosition) {
- source.logPositionAndCleanOldLogs(batch);
+ // position and the file will be removed soon in cleanOldWALs.
+ if (batch.isEndOfFile() || !batch.getLastWalPath().equals(currentPath)
+ || batch.getLastWalPosition() != currentPosition) {
+ source.setWALPosition(batch);
+ source.cleanOldWALs(batch.getLastWalPath().getName(), batch.isEndOfFile());
updated = true;
}
// if end of file is true, then we can just skip to the next file in queue.
@@ -372,8 +367,8 @@ public class ReplicationSourceShipper extends Thread {
LOG.trace("Decrementing totalBufferUsed by {}B while stopping Replication WAL Readers.",
totalToDecrement.longValue());
}
- long newBufferUsed = source.getSourceManager().getTotalBufferUsed()
+ long newBufferUsed = source.manager.getTotalBufferUsed()
.addAndGet(-totalToDecrement.longValue());
- source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
+ source.manager.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index 301a9e8..fdbc7ff 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -25,7 +25,6 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -75,8 +74,6 @@ class ReplicationSourceWALReader extends Thread {
//Indicates whether this particular worker is running
private boolean isReaderRunning = true;
- private AtomicLong totalBufferUsed;
- private long totalBufferQuota;
private final String walGroupId;
/**
@@ -104,8 +101,6 @@ class ReplicationSourceWALReader extends Thread {
// memory used will be batchSizeCapacity * (nb.batches + 1)
// the +1 is for the current thread reading before placing onto the queue
int batchCount = conf.getInt("replication.source.nb.batches", 1);
- this.totalBufferUsed = source.getSourceManager().getTotalBufferUsed();
- this.totalBufferQuota = source.getSourceManager().getTotalBufferLimit();
this.sleepForRetries =
this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second
this.maxRetriesMultiplier =
@@ -283,9 +278,10 @@ class ReplicationSourceWALReader extends Thread {
//returns false if we've already exceeded the global quota
private boolean checkQuota() {
// try not to go over total quota
- if (totalBufferUsed.get() > totalBufferQuota) {
+ if (source.manager.getTotalBufferUsed().get() > source.manager.getTotalBufferLimit()) {
LOG.warn("peer={}, can't read more edits from WAL as buffer usage {}B exceeds limit {}B",
- this.source.getPeerId(), totalBufferUsed.get(), totalBufferQuota);
+ this.source.getPeerId(), source.manager.getTotalBufferUsed().get(),
+ source.manager.getTotalBufferLimit());
Threads.sleep(sleepForRetries);
return false;
}
@@ -414,10 +410,10 @@ class ReplicationSourceWALReader extends Thread {
* @return true if we should clear buffer and push all
*/
private boolean acquireBufferQuota(long size) {
- long newBufferUsed = totalBufferUsed.addAndGet(size);
+ long newBufferUsed = source.manager.getTotalBufferUsed().addAndGet(size);
// Record the new buffer usage
- this.source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
- return newBufferUsed >= totalBufferQuota;
+ source.manager.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
+ return newBufferUsed >= source.manager.getTotalBufferLimit();
}
/**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
index 4f96c96..591b44d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
@@ -31,7 +31,7 @@ import org.apache.yetus.audience.InterfaceAudience;
* Holds a batch of WAL entries to replicate, along with some statistics
*/
@InterfaceAudience.Private
-class WALEntryBatch {
+public class WALEntryBatch {
// used by recovered replication queue to indicate that all the entries have been read.
public static final WALEntryBatch NO_MORE_DATA = new WALEntryBatch(0, null);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index 4f656b1..42445a6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
+import org.apache.hadoop.hbase.replication.regionserver.WALEntryBatch;
import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry;
@@ -38,7 +39,6 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
*/
public class ReplicationSourceDummy implements ReplicationSourceInterface {
- private ReplicationSourceManager manager;
private ReplicationPeer replicationPeer;
private String peerClusterId;
private Path currentPath;
@@ -47,11 +47,10 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
private AtomicBoolean startup = new AtomicBoolean(false);
@Override
- public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
- ReplicationQueueStorage rq, ReplicationPeer rp, Server server, String peerClusterId,
- UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics)
- throws IOException {
- this.manager = manager;
+ public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager,
+ ReplicationQueueStorage rq, ReplicationPeer rp, Server server, String peerClusterId,
+ UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics)
+ throws IOException {
this.peerClusterId = peerClusterId;
this.metrics = metrics;
this.walFileLengthProvider = walFileLengthProvider;
@@ -134,11 +133,6 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
}
@Override
- public ReplicationSourceManager getSourceManager() {
- return manager;
- }
-
- @Override
public void tryThrottle(int batchSize) throws InterruptedException {
}
@@ -162,6 +156,14 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
}
@Override
+ public void setWALPosition(WALEntryBatch entryBatch) {
+ }
+
+ @Override
+ public void cleanOldWALs(String walName, boolean inclusive) {
+ }
+
+ @Override
public ReplicationPeer getPeer() {
return replicationPeer;
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
index 86a71c9..94826ff 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
@@ -139,7 +139,7 @@ public class TestReplicationSource {
String queueId = "qid";
RegionServerServices rss =
TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
- rs.init(conf, null, manager, null, mockPeer, rss, queueId, null,
+ rs.init(conf, null, null, manager, null, mockPeer, rss, queueId, null,
p -> OptionalLong.empty(), new MetricsSource(queueId));
try {
rs.startup();
@@ -173,11 +173,11 @@ public class TestReplicationSource {
thenReturn(DoNothingReplicationEndpoint.class.getName());
Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
- Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
+ Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
String queueId = "qid";
RegionServerServices rss =
TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
- rs.init(conf, null, manager, null, mockPeer, rss, queueId,
+ rs.init(conf, null, null, manager, null, mockPeer, rss, queueId,
uuid, p -> OptionalLong.empty(), new MetricsSource(queueId));
try {
rs.startup();
@@ -265,7 +265,7 @@ public class TestReplicationSource {
testConf.setInt("replication.source.maxretriesmultiplier", 1);
ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
- source.init(testConf, null, manager, null, mockPeer, null, "testPeer",
+ source.init(testConf, null, null, manager, null, mockPeer, null, "testPeer",
null, p -> OptionalLong.empty(), null);
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<?> future = executor.submit(
@@ -289,7 +289,7 @@ public class TestReplicationSource {
ReplicationPeer mockPeer = mock(ReplicationPeer.class);
Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
Configuration testConf = HBaseConfiguration.create();
- source.init(testConf, null, mockManager, null, mockPeer, null,
+ source.init(testConf, null, null, mockManager, null, mockPeer, null,
"testPeer", null, p -> OptionalLong.empty(), mock(MetricsSource.class));
ReplicationSourceWALReader reader = new ReplicationSourceWALReader(null,
conf, null, 0, null, source, null);
@@ -315,7 +315,7 @@ public class TestReplicationSource {
reader.addEntryToBatch(batch, mockEntry);
reader.entryBatchQueue.put(batch);
source.terminate("test");
- assertEquals(0, source.getSourceManager().getTotalBufferUsed().get());
+ assertEquals(0, source.manager.getTotalBufferUsed().get());
}
/**
@@ -524,7 +524,7 @@ public class TestReplicationSource {
String queueId = "qid";
RegionServerServices rss =
TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
- rs.init(conf, null, manager, null, mockPeer, rss, queueId, null,
+ rs.init(conf, null, null, manager, null, mockPeer, rss, queueId, null,
p -> OptionalLong.empty(), new MetricsSource(queueId));
return rss;
}
@@ -624,7 +624,7 @@ public class TestReplicationSource {
TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
ReplicationSource source = new ReplicationSource();
- source.init(conf, null, manager, null, mockPeer, rss, id, null,
+ source.init(conf, null, null, manager, null, mockPeer, rss, id, null,
p -> OptionalLong.empty(), metrics);
final Path log1 = new Path(logDir, "log-walgroup-a.8");
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 2e35ef4..597349a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -35,6 +35,7 @@ import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.NavigableSet;
+import java.util.OptionalLong;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeMap;
@@ -334,8 +335,9 @@ public abstract class TestReplicationSourceManager {
when(source.getQueueId()).thenReturn("1");
when(source.isRecovered()).thenReturn(false);
when(source.isSyncReplication()).thenReturn(false);
- manager.logPositionAndCleanOldLogs(source,
- new WALEntryBatch(0, manager.getSources().get(0).getCurrentPath()));
+ WALEntryBatch batch = new WALEntryBatch(0, manager.getSources().get(0).getCurrentPath());
+ source.setWALPosition(batch);
+ source.cleanOldWALs(batch.getLastWalPath().getName(), batch.isEndOfFile());
wal.appendData(hri,
new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes),
@@ -409,11 +411,10 @@ public abstract class TestReplicationSourceManager {
assertEquals(1, manager.getWalsByIdRecoveredQueues().size());
String id = "1-" + server.getServerName().getServerName();
assertEquals(files, manager.getWalsByIdRecoveredQueues().get(id).get(group));
- ReplicationSourceInterface source = mock(ReplicationSourceInterface.class);
- when(source.getQueueId()).thenReturn(id);
- when(source.isRecovered()).thenReturn(true);
- when(source.isSyncReplication()).thenReturn(false);
- manager.cleanOldLogs(file2, false, source);
+ ReplicationSourceInterface source = new ReplicationSource();
+ source.init(conf, fs, null, manager, manager.getQueueStorage(), rp1.getPeer("1"),
+ manager.getServer(), id, null, p -> OptionalLong.empty(), null);
+ source.cleanOldWALs(file2, false);
// log1 should be deleted
assertEquals(Sets.newHashSet(file2), manager.getWalsByIdRecoveredQueues().get(id).get(group));
}
@@ -590,19 +591,15 @@ public abstract class TestReplicationSourceManager {
}
}
- private ReplicationSourceInterface mockReplicationSource(String peerId) {
- ReplicationSourceInterface source = mock(ReplicationSourceInterface.class);
- when(source.getPeerId()).thenReturn(peerId);
- when(source.getQueueId()).thenReturn(peerId);
- when(source.isRecovered()).thenReturn(false);
- when(source.isSyncReplication()).thenReturn(true);
+ private ReplicationPeer mockReplicationPeerForSyncReplication(String peerId) {
ReplicationPeerConfig config = mock(ReplicationPeerConfig.class);
when(config.getRemoteWALDir())
.thenReturn(remoteLogDir.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString());
+ when(config.isSyncReplication()).thenReturn(true);
ReplicationPeer peer = mock(ReplicationPeer.class);
when(peer.getPeerConfig()).thenReturn(config);
- when(source.getPeer()).thenReturn(peer);
- return source;
+ when(peer.getId()).thenReturn(peerId);
+ return peer;
}
@Test
@@ -631,13 +628,19 @@ public abstract class TestReplicationSourceManager {
manager.preLogRoll(wal);
manager.postLogRoll(wal);
- ReplicationSourceInterface source = mockReplicationSource(peerId2);
- manager.cleanOldLogs(walName, true, source);
+ ReplicationSourceInterface source = new ReplicationSource();
+ source.init(conf, fs, null, manager, manager.getQueueStorage(),
+ mockReplicationPeerForSyncReplication(peerId2), manager.getServer(), peerId2, null,
+ p -> OptionalLong.empty(), null);
+ source.cleanOldWALs(walName, true);
// still there if peer id does not match
assertTrue(fs.exists(remoteWAL));
- source = mockReplicationSource(slaveId);
- manager.cleanOldLogs(walName, true, source);
+ source = new ReplicationSource();
+ source.init(conf, fs, null, manager, manager.getQueueStorage(),
+ mockReplicationPeerForSyncReplication(slaveId), manager.getServer(), slaveId, null,
+ p -> OptionalLong.empty(), null);
+ source.cleanOldWALs(walName, true);
assertFalse(fs.exists(remoteWAL));
} finally {
removePeerAndWait(peerId2);
@@ -814,11 +817,10 @@ public abstract class TestReplicationSourceManager {
static class FailInitializeDummyReplicationSource extends ReplicationSourceDummy {
- @Override
- public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
- ReplicationQueueStorage rq, ReplicationPeer rp, Server server, String peerClusterId,
- UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics)
- throws IOException {
+ @Override public void init(Configuration conf, FileSystem fs, Path walDir,
+ ReplicationSourceManager manager, ReplicationQueueStorage rq, ReplicationPeer rp,
+ Server server, String peerClusterId, UUID clusterId,
+ WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
throw new IOException("Failing deliberately");
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
index 9c6fafc..ca78cf0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
@@ -383,23 +383,27 @@ public class TestWALEntryStream {
}
private ReplicationSource mockReplicationSource(boolean recovered, Configuration conf) {
- ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
- when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
- when(mockSourceManager.getTotalBufferLimit()).thenReturn(
- (long) HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
Server mockServer = Mockito.mock(Server.class);
ReplicationSource source = Mockito.mock(ReplicationSource.class);
- when(source.getSourceManager()).thenReturn(mockSourceManager);
when(source.getSourceMetrics()).thenReturn(new MetricsSource("1"));
when(source.getWALFileLengthProvider()).thenReturn(log);
when(source.getServer()).thenReturn(mockServer);
when(source.isRecovered()).thenReturn(recovered);
- MetricsReplicationGlobalSourceSource globalMetrics = Mockito.mock(
- MetricsReplicationGlobalSourceSource.class);
- when(mockSourceManager.getGlobalMetrics()).thenReturn(globalMetrics);
+ source.manager = mockReplicationSourceManager();
return source;
}
+ private ReplicationSourceManager mockReplicationSourceManager() {
+ ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
+ MetricsReplicationGlobalSourceSource globalMetrics =
+ Mockito.mock(MetricsReplicationGlobalSourceSource.class);
+ when(mockSourceManager.getGlobalMetrics()).thenReturn(globalMetrics);
+ when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
+ when(mockSourceManager.getTotalBufferLimit())
+ .thenReturn((long) HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
+ return mockSourceManager;
+ }
+
private ReplicationSourceWALReader createReader(boolean recovered, Configuration conf) {
ReplicationSource source = mockReplicationSource(recovered, conf);
when(source.isPeerEnabled()).thenReturn(true);