You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2020/10/22 06:56:20 UTC
[hbase] 06/07: HBASE-24998 Introduce a
ReplicationSourceOverallController interface and decouple
ReplicationSourceManager and ReplicationSource (#2364)
This is an automated email from the ASF dual-hosted git repository.
zghao pushed a commit to branch HBASE-24666
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 4f77158eaa8cf99c5d1b8b56ca1736287bbc6f5f
Author: Guanghao Zhang <zg...@apache.org>
AuthorDate: Sun Sep 20 09:02:53 2020 +0800
HBASE-24998 Introduce a ReplicationSourceOverallController interface and decouple ReplicationSourceManager and ReplicationSource (#2364)
Signed-off-by: meiyi <my...@gmail.com>
---
.../java/org/apache/hadoop/hbase/HConstants.java | 2 +
.../hbase/replication/ReplicationListener.java | 2 +-
.../replication/ReplicationSourceController.java | 31 +++--
.../regionserver/RecoveredReplicationSource.java | 18 ++-
.../regionserver/ReplicationSource.java | 31 ++---
.../regionserver/ReplicationSourceInterface.java | 25 ++--
.../regionserver/ReplicationSourceManager.java | 141 +++++++++++----------
.../regionserver/ReplicationSourceWALReader.java | 13 +-
.../hbase/replication/ReplicationSourceDummy.java | 21 ++-
.../regionserver/TestReplicationSourceManager.java | 11 +-
.../regionserver/TestWALEntryStream.java | 15 ++-
11 files changed, 167 insertions(+), 143 deletions(-)
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index e1d3de9..5ffd1d6 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -979,6 +979,8 @@ public final class HConstants {
/*
* cluster replication constants.
*/
+ public static final String REPLICATION_OFFLOAD_ENABLE_KEY = "hbase.replication.offload.enabled";
+ public static final boolean REPLICATION_OFFLOAD_ENABLE_DEFAULT = false;
public static final String
REPLICATION_SOURCE_SERVICE_CLASSNAME = "hbase.replication.source.service";
public static final String REPLICATION_SERVICE_CLASSNAME_DEFAULT =
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java
index f040bf9..6ecbb46 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java
@@ -33,5 +33,5 @@ public interface ReplicationListener {
* A region server has been removed from the local cluster
* @param regionServer the removed region server
*/
- public void regionServerRemoved(String regionServer);
+ void regionServerRemoved(String regionServer);
}
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSourceController.java
similarity index 50%
copy from hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java
copy to hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSourceController.java
index f040bf9..5bb9dd6 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSourceController.java
@@ -1,5 +1,4 @@
-/*
- *
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -18,20 +17,32 @@
*/
package org.apache.hadoop.hbase.replication;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource;
+import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource;
import org.apache.yetus.audience.InterfaceAudience;
/**
- * The replication listener interface can be implemented if a class needs to subscribe to events
- * generated by the ReplicationTracker. These events include things like addition/deletion of peer
- * clusters or failure of a local region server. To receive events, the class also needs to register
- * itself with a Replication Tracker.
+ * Used to control all replication sources inside one RegionServer or ReplicationServer.
+ * Used by {@link org.apache.hadoop.hbase.replication.regionserver.ReplicationSource} or
+ * {@link RecoveredReplicationSource}.
*/
@InterfaceAudience.Private
-public interface ReplicationListener {
+public interface ReplicationSourceController {
+
+ /**
+ * Returns the maximum size in bytes of edits held in memory which are pending replication
+ * across all sources inside this RegionServer or ReplicationServer.
+ */
+ long getTotalBufferLimit();
+
+ AtomicLong getTotalBufferUsed();
+
+ MetricsReplicationGlobalSourceSource getGlobalMetrics();
/**
- * A region server has been removed from the local cluster
- * @param regionServer the removed region server
+ * Call this when the recovered replication source replicated all WALs.
*/
- public void regionServerRemoved(String regionServer);
+ void finishRecoveredSource(RecoveredReplicationSource src);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
index e3400ad..eece3c0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationSourceController;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.yetus.audience.InterfaceAudience;
@@ -45,18 +46,15 @@ public class RecoveredReplicationSource extends ReplicationSource {
private static final Logger LOG = LoggerFactory.getLogger(RecoveredReplicationSource.class);
- private Path walDir;
-
private String actualPeerId;
@Override
- public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager,
- ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
- String peerClusterZnode, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
- MetricsSource metrics) throws IOException {
- super.init(conf, fs, walDir, manager, queueStorage, replicationPeer, server, peerClusterZnode,
- clusterId, walFileLengthProvider, metrics);
- this.walDir = walDir;
+ public void init(Configuration conf, FileSystem fs, Path walDir,
+ ReplicationSourceController overallController, ReplicationQueueStorage queueStorage,
+ ReplicationPeer replicationPeer, Server server, String peerClusterZnode, UUID clusterId,
+ WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
+ super.init(conf, fs, walDir, overallController, queueStorage, replicationPeer, server,
+ peerClusterZnode, clusterId, walFileLengthProvider, metrics);
this.actualPeerId = this.replicationQueueInfo.getPeerId();
}
@@ -149,7 +147,7 @@ public class RecoveredReplicationSource extends ReplicationSource {
void tryFinish() {
if (workerThreads.isEmpty()) {
this.getSourceMetrics().clear();
- manager.finishRecoveredSource(this);
+ controller.finishRecoveredSource(this);
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 473abfd..848081a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationSourceController;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
@@ -99,8 +100,9 @@ public class ReplicationSource implements ReplicationSourceInterface {
protected Configuration conf;
protected ReplicationQueueInfo replicationQueueInfo;
- // The manager of all sources to which we ping back our progress
- ReplicationSourceManager manager;
+ protected Path walDir;
+
+ protected ReplicationSourceController controller;
// Should we stop everything?
protected Server server;
// How long should we sleep for each retry
@@ -187,23 +189,14 @@ public class ReplicationSource implements ReplicationSourceInterface {
this.baseFilterOutWALEntries = Collections.unmodifiableList(baseFilterOutWALEntries);
}
- /**
- * Instantiation method used by region servers
- * @param conf configuration to use
- * @param fs file system to use
- * @param manager replication manager to ping to
- * @param server the server for this region server
- * @param queueId the id of our replication queue
- * @param clusterId unique UUID for the cluster
- * @param metrics metrics for replication source
- */
@Override
- public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager,
- ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
- String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
- MetricsSource metrics) throws IOException {
+ public void init(Configuration conf, FileSystem fs, Path walDir,
+ ReplicationSourceController overallController, ReplicationQueueStorage queueStorage,
+ ReplicationPeer replicationPeer, Server server, String queueId, UUID clusterId,
+ WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
this.server = server;
this.conf = HBaseConfiguration.create(conf);
+ this.walDir = walDir;
this.waitOnEndpointSeconds =
this.conf.getInt(WAIT_ON_ENDPOINT_SECONDS, DEFAULT_WAIT_ON_ENDPOINT_SECONDS);
decorateConf();
@@ -214,7 +207,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
this.queueStorage = queueStorage;
this.replicationPeer = replicationPeer;
- this.manager = manager;
+ this.controller = overallController;
this.fs = fs;
this.metrics = metrics;
this.clusterId = clusterId;
@@ -767,9 +760,9 @@ public class ReplicationSource implements ReplicationSourceInterface {
throttler.addPushSize(batchSize);
}
totalReplicatedEdits.addAndGet(entries.size());
- long newBufferUsed = manager.getTotalBufferUsed().addAndGet(-batchSize);
+ long newBufferUsed = controller.getTotalBufferUsed().addAndGet(-batchSize);
// Record the new buffer usage
- this.manager.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
+ controller.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
}
@Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index 321edc2..f3bf8a4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationSourceController;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.yetus.audience.InterfaceAudience;
@@ -44,14 +45,22 @@ public interface ReplicationSourceInterface {
/**
* Initializer for the source
*
- * @param conf the configuration to use
- * @param fs the file system to use
- * @param server the server for this region server
- */
- void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager,
- ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
- String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
- MetricsSource metrics) throws IOException;
+ * @param conf configuration to use
+ * @param fs file system to use
+ * @param walDir the directory where the WAL is located
+ * @param overallController the overall controller of all replication sources
+ * @param queueStorage the replication queue storage
+ * @param replicationPeer the replication peer
+ * @param server the server which start and run this replication source
+ * @param queueId the id of our replication queue
+ * @param clusterId unique UUID for the cluster
+ * @param walFileLengthProvider used to get the WAL length
+ * @param metrics metrics for this replication source
+ */
+ void init(Configuration conf, FileSystem fs, Path walDir,
+ ReplicationSourceController overallController, ReplicationQueueStorage queueStorage,
+ ReplicationPeer replicationPeer, Server server, String queueId, UUID clusterId,
+ WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException;
/**
* Add a log to the list of logs to replicate
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 3212697..de9e21f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationSourceController;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.util.Bytes;
@@ -92,7 +93,7 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto
* </ul>
*/
@InterfaceAudience.Private
-public class ReplicationSourceManager implements ReplicationListener {
+public class ReplicationSourceManager implements ReplicationListener, ReplicationSourceController {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceManager.class);
// all the sources that read this RS's logs and every peer only has one replication source
private final ConcurrentMap<String, ReplicationSourceInterface> sources;
@@ -126,12 +127,6 @@ public class ReplicationSourceManager implements ReplicationListener {
private AtomicLong totalBufferUsed = new AtomicLong();
- // How long should we sleep for each retry when deleting remote wal files for sync replication
- // peer.
- private final long sleepForRetries;
- // Maximum number of retries before taking bold actions when deleting remote wal files for sync
- // replication peer.
- private final int maxRetriesMultiplier;
// Total buffer size on this RegionServer for holding batched edits to be shipped.
private final long totalBufferLimit;
private final MetricsReplicationGlobalSourceSource globalMetrics;
@@ -139,6 +134,12 @@ public class ReplicationSourceManager implements ReplicationListener {
private final Map<String, MetricsSource> sourceMetrics = new HashMap<>();
/**
+ * When enable replication offload, will not create replication source and only write WAL to
+ * replication queue storage. The replication source will be started by ReplicationServer.
+ */
+ private final boolean replicationOffload;
+
+ /**
* Creates a replication manager and sets the watch on all the other registered region servers
* @param queueStorage the interface for manipulating replication queues
* @param replicationPeers
@@ -186,12 +187,11 @@ public class ReplicationSourceManager implements ReplicationListener {
this.latestPaths = new HashMap<>();
this.replicationForBulkLoadDataEnabled = conf.getBoolean(
HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
- this.sleepForRetries = this.conf.getLong("replication.source.sync.sleepforretries", 1000);
- this.maxRetriesMultiplier =
- this.conf.getInt("replication.source.sync.maxretriesmultiplier", 60);
this.totalBufferLimit = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
this.globalMetrics = globalMetrics;
+ this.replicationOffload = conf.getBoolean(HConstants.REPLICATION_OFFLOAD_ENABLE_KEY,
+ HConstants.REPLICATION_OFFLOAD_ENABLE_DEFAULT);
}
/**
@@ -212,6 +212,47 @@ public class ReplicationSourceManager implements ReplicationListener {
return this.executor.submit(this::adoptAbandonedQueues);
}
+ @VisibleForTesting
+ @Override
+ public AtomicLong getTotalBufferUsed() {
+ return totalBufferUsed;
+ }
+
+ @Override
+ public long getTotalBufferLimit() {
+ return totalBufferLimit;
+ }
+
+ @Override
+ public void finishRecoveredSource(RecoveredReplicationSource src) {
+ synchronized (oldsources) {
+ if (!removeRecoveredSource(src)) {
+ return;
+ }
+ }
+ LOG.info("Finished recovering queue {} with the following stats: {}", src.getQueueId(),
+ src.getStats());
+ }
+
+ @Override
+ public MetricsReplicationGlobalSourceSource getGlobalMetrics() {
+ return this.globalMetrics;
+ }
+
+ /**
+ * Clear the metrics and related replication queue of the specified old source
+ * @param src source to clear
+ */
+ private boolean removeRecoveredSource(ReplicationSourceInterface src) {
+ if (!this.oldsources.remove(src)) {
+ return false;
+ }
+ LOG.info("Done with the recovered queue {}", src.getQueueId());
+ // Delete queue from storage and memory
+ deleteQueue(src.getQueueId());
+ return true;
+ }
+
private void adoptAbandonedQueues() {
List<ServerName> currentReplicators = null;
try {
@@ -331,8 +372,7 @@ public class ReplicationSourceManager implements ReplicationListener {
* @param peerId the id of the replication peer
* @return the source that was created
*/
- @VisibleForTesting
- ReplicationSourceInterface addSource(String peerId) throws IOException {
+ void addSource(String peerId) throws IOException {
ReplicationPeer peer = replicationPeers.getPeer(peerId);
ReplicationSourceInterface src = createSource(peerId, peer);
// synchronized on latestPaths to avoid missing the new log
@@ -354,8 +394,9 @@ public class ReplicationSourceManager implements ReplicationListener {
if (peerConfig.isSyncReplication()) {
syncReplicationPeerMappingManager.add(peer.getId(), peerConfig);
}
- src.startup();
- return src;
+ if (!replicationOffload) {
+ src.startup();
+ }
}
/**
@@ -373,7 +414,11 @@ public class ReplicationSourceManager implements ReplicationListener {
* </p>
* @param peerId the id of the sync replication peer
*/
- public void drainSources(String peerId) throws IOException, ReplicationException {
+ void drainSources(String peerId) throws IOException, ReplicationException {
+ if (replicationOffload) {
+ throw new ReplicationException(
+ "Should not add use sync replication when replication offload enabled");
+ }
String terminateMessage = "Sync replication peer " + peerId +
" is transiting to STANDBY. Will close the previous replication source and open a new one";
ReplicationPeer peer = replicationPeers.getPeer(peerId);
@@ -430,7 +475,7 @@ public class ReplicationSourceManager implements ReplicationListener {
* replication queue storage and only to enqueue all logs to the new replication source
* @param peerId the id of the replication peer
*/
- public void refreshSources(String peerId) throws ReplicationException, IOException {
+ void refreshSources(String peerId) throws ReplicationException, IOException {
String terminateMessage = "Peer " + peerId +
" state or config changed. Will close the previous replication source and open a new one";
ReplicationPeer peer = replicationPeers.getPeer(peerId);
@@ -447,7 +492,9 @@ public class ReplicationSourceManager implements ReplicationListener {
.forEach(wal -> src.enqueueLog(new Path(this.logDir, wal)));
}
LOG.info("Startup replication source for " + src.getPeerId());
- src.startup();
+ if (!replicationOffload) {
+ src.startup();
+ }
List<ReplicationSourceInterface> toStartup = new ArrayList<>();
// synchronized on oldsources to avoid race with NodeFailoverWorker
@@ -470,41 +517,18 @@ public class ReplicationSourceManager implements ReplicationListener {
toStartup.add(recoveredReplicationSource);
}
}
- for (ReplicationSourceInterface replicationSource : toStartup) {
- replicationSource.startup();
- }
- }
-
- /**
- * Clear the metrics and related replication queue of the specified old source
- * @param src source to clear
- */
- private boolean removeRecoveredSource(ReplicationSourceInterface src) {
- if (!this.oldsources.remove(src)) {
- return false;
- }
- LOG.info("Done with the recovered queue {}", src.getQueueId());
- // Delete queue from storage and memory
- deleteQueue(src.getQueueId());
- return true;
- }
-
- void finishRecoveredSource(ReplicationSourceInterface src) {
- synchronized (oldsources) {
- if (!removeRecoveredSource(src)) {
- return;
+ if (!replicationOffload) {
+ for (ReplicationSourceInterface replicationSource : toStartup) {
+ replicationSource.startup();
}
}
- LOG.info("Finished recovering queue {} with the following stats: {}", src.getQueueId(),
- src.getStats());
}
/**
* Clear the metrics and related replication queue of the specified old source
* @param src source to clear
*/
- void removeSource(ReplicationSourceInterface src) {
- LOG.info("Done with the queue " + src.getQueueId());
+ private void removeSource(ReplicationSourceInterface src) {
this.sources.remove(src.getPeerId());
// Delete queue from storage and memory
deleteQueue(src.getQueueId());
@@ -548,8 +572,7 @@ public class ReplicationSourceManager implements ReplicationListener {
}
}
- // public because of we call it in TestReplicationEmptyWALRecovery
- @VisibleForTesting
+ @InterfaceAudience.Private
public void preLogRoll(Path newLog) throws IOException {
String logName = newLog.getName();
String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName);
@@ -567,9 +590,8 @@ public class ReplicationSourceManager implements ReplicationListener {
}
}
- // public because of we call it in TestReplicationEmptyWALRecovery
- @VisibleForTesting
- public void postLogRoll(Path newLog) throws IOException {
+ @InterfaceAudience.Private
+ public void postLogRoll(Path newLog) {
// This only updates the sources we own, not the recovered ones
for (ReplicationSourceInterface source : this.sources.values()) {
source.enqueueLog(newLog);
@@ -739,7 +761,9 @@ public class ReplicationSourceManager implements ReplicationListener {
LOG.trace("Enqueueing log from recovered queue for source: " + src.getQueueId());
src.enqueueLog(new Path(oldLogDir, wal));
}
- src.startup();
+ if (!replicationOffload) {
+ src.startup();
+ }
}
} catch (IOException e) {
// TODO manage it
@@ -849,19 +873,6 @@ public class ReplicationSourceManager implements ReplicationListener {
}
}
- @VisibleForTesting
- public AtomicLong getTotalBufferUsed() {
- return totalBufferUsed;
- }
-
- /**
- * Returns the maximum size in bytes of edits held in memory which are pending replication
- * across all sources inside this RegionServer.
- */
- public long getTotalBufferLimit() {
- return totalBufferLimit;
- }
-
/**
* Get the directory where wals are archived
* @return the directory where wals are archived
@@ -967,10 +978,6 @@ public class ReplicationSourceManager implements ReplicationListener {
return executor.getActiveCount();
}
- MetricsReplicationGlobalSourceSource getGlobalMetrics() {
- return this.globalMetrics;
- }
-
@InterfaceAudience.Private
Server getServer() {
return this.server;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index 22cbd97..7b7d0d8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -267,10 +267,11 @@ class ReplicationSourceWALReader extends Thread {
//returns false if we've already exceeded the global quota
private boolean checkQuota() {
// try not to go over total quota
- if (source.manager.getTotalBufferUsed().get() > source.manager.getTotalBufferLimit()) {
+ if (source.controller.getTotalBufferUsed().get() > source.controller
+ .getTotalBufferLimit()) {
LOG.warn("peer={}, can't read more edits from WAL as buffer usage {}B exceeds limit {}B",
- this.source.getPeerId(), source.manager.getTotalBufferUsed().get(),
- source.manager.getTotalBufferLimit());
+ this.source.getPeerId(), source.controller.getTotalBufferUsed().get(),
+ source.controller.getTotalBufferLimit());
Threads.sleep(sleepForRetries);
return false;
}
@@ -399,10 +400,10 @@ class ReplicationSourceWALReader extends Thread {
* @return true if we should clear buffer and push all
*/
private boolean acquireBufferQuota(long size) {
- long newBufferUsed = source.manager.getTotalBufferUsed().addAndGet(size);
+ long newBufferUsed = source.controller.getTotalBufferUsed().addAndGet(size);
// Record the new buffer usage
- source.manager.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
- return newBufferUsed >= source.manager.getTotalBufferLimit();
+ source.controller.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
+ return newBufferUsed >= source.controller.getTotalBufferLimit();
}
/**
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index b75a7ed..66059c7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
-import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
import org.apache.hadoop.hbase.replication.regionserver.WALEntryBatch;
import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry;
@@ -40,21 +39,21 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
public class ReplicationSourceDummy implements ReplicationSourceInterface {
private ReplicationPeer replicationPeer;
- private String peerClusterId;
+ private String queueId;
private Path currentPath;
private MetricsSource metrics;
private WALFileLengthProvider walFileLengthProvider;
private AtomicBoolean startup = new AtomicBoolean(false);
@Override
- public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager,
- ReplicationQueueStorage rq, ReplicationPeer rp, Server server, String peerClusterId,
- UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics)
- throws IOException {
- this.peerClusterId = peerClusterId;
+ public void init(Configuration conf, FileSystem fs, Path walDir,
+ ReplicationSourceController overallController, ReplicationQueueStorage queueStorage,
+ ReplicationPeer replicationPeer, Server server, String queueId, UUID clusterId,
+ WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
+ this.queueId = queueId;
this.metrics = metrics;
this.walFileLengthProvider = walFileLengthProvider;
- this.replicationPeer = rp;
+ this.replicationPeer = replicationPeer;
}
@Override
@@ -96,14 +95,14 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
@Override
public String getQueueId() {
- return peerClusterId;
+ return queueId;
}
@Override
public String getPeerId() {
- String[] parts = peerClusterId.split("-", 2);
+ String[] parts = queueId.split("-", 2);
return parts.length != 1 ?
- parts[0] : peerClusterId;
+ parts[0] : queueId;
}
@Override
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 4b685ce..0e0353f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -43,6 +43,7 @@ import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -72,6 +73,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationSourceController;
import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
@@ -816,10 +818,11 @@ public abstract class TestReplicationSourceManager {
static class FailInitializeDummyReplicationSource extends ReplicationSourceDummy {
- @Override public void init(Configuration conf, FileSystem fs, Path walDir,
- ReplicationSourceManager manager, ReplicationQueueStorage rq, ReplicationPeer rp,
- Server server, String peerClusterId, UUID clusterId,
- WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
+ @Override
+ public void init(Configuration conf, FileSystem fs, Path walDir,
+ ReplicationSourceController overallController, ReplicationQueueStorage queueStorage,
+ ReplicationPeer replicationPeer, Server server, String queueId, UUID clusterId,
+ WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException{
throw new IOException("Failing deliberately");
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
index 9410604..bafabb0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
+import org.apache.hadoop.hbase.replication.ReplicationSourceController;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
@@ -375,19 +376,19 @@ public class TestWALEntryStream {
when(source.getWALFileLengthProvider()).thenReturn(log);
when(source.getServer()).thenReturn(mockServer);
when(source.isRecovered()).thenReturn(recovered);
- source.manager = mockReplicationSourceManager();
+ source.controller = mockReplicationSourceController();
return source;
}
- private ReplicationSourceManager mockReplicationSourceManager() {
- ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
+ private ReplicationSourceController mockReplicationSourceController() {
+ ReplicationSourceController controller = Mockito.mock(ReplicationSourceController.class);
MetricsReplicationGlobalSourceSource globalMetrics =
Mockito.mock(MetricsReplicationGlobalSourceSource.class);
- when(mockSourceManager.getGlobalMetrics()).thenReturn(globalMetrics);
- when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
- when(mockSourceManager.getTotalBufferLimit())
+ when(controller.getGlobalMetrics()).thenReturn(globalMetrics);
+ when(controller.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
+ when(controller.getTotalBufferLimit())
.thenReturn((long) HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
- return mockSourceManager;
+ return controller;
}
private ReplicationSourceWALReader createReader(boolean recovered, Configuration conf) {