You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by su...@apache.org on 2021/07/09 06:56:03 UTC
[hbase] 09/12: HBASE-25071 ReplicationServer support start
ReplicationSource internal (#2452)
This is an automated email from the ASF dual-hosted git repository.
sunxin pushed a commit to branch HBASE-24666
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit c8c85f4f4205ebcd6c7fbecf55072386c3bb842b
Author: Guanghao Zhang <zg...@apache.org>
AuthorDate: Mon Nov 9 11:46:02 2020 +0800
HBASE-25071 ReplicationServer support start ReplicationSource internal (#2452)
Signed-off-by: XinSun <dd...@gmail.com>
---
.../server/replication/ReplicationServer.proto | 14 +-
.../replication/ZKReplicationQueueStorage.java | 4 +-
.../replication/ZKReplicationStorageBase.java | 4 +
.../hadoop/hbase/master/MasterRpcServices.java | 2 +-
.../hadoop/hbase/regionserver/RSRpcServices.java | 2 +-
.../replication/HBaseReplicationEndpoint.java | 14 +-
.../hbase/replication/HReplicationServer.java | 175 ++++++++++++++++++---
.../replication/ReplicationServerRpcServices.java | 15 ++
.../regionserver/RecoveredReplicationSource.java | 9 +-
.../regionserver/ReplicationSource.java | 54 ++++++-
.../regionserver/ReplicationSourceFactory.java | 2 +-
.../regionserver/ReplicationSourceInterface.java | 6 +-
.../regionserver/ReplicationSourceManager.java | 9 +-
.../hbase/replication/ReplicationSourceDummy.java | 5 +-
.../replication/TestReplicationFetchServers.java | 43 +++--
...nServer.java => TestReplicationServerSink.java} | 25 +--
.../replication/TestReplicationServerSource.java | 69 ++++++++
.../regionserver/TestReplicationSource.java | 20 +--
.../regionserver/TestReplicationSourceManager.java | 18 ++-
19 files changed, 400 insertions(+), 90 deletions(-)
diff --git a/hbase-protocol-shaded/src/main/protobuf/server/replication/ReplicationServer.proto b/hbase-protocol-shaded/src/main/protobuf/server/replication/ReplicationServer.proto
index ed334c4..925aed4 100644
--- a/hbase-protocol-shaded/src/main/protobuf/server/replication/ReplicationServer.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/server/replication/ReplicationServer.proto
@@ -24,9 +24,21 @@ option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;
+import "HBase.proto";
import "server/region/Admin.proto";
+message StartReplicationSourceRequest {
+ required ServerName server_name = 1;
+ required string queue_id = 2;
+}
+
+message StartReplicationSourceResponse {
+}
+
service ReplicationServerService {
rpc ReplicateWALEntry(ReplicateWALEntryRequest)
returns(ReplicateWALEntryResponse);
-}
\ No newline at end of file
+
+ rpc StartReplicationSource(StartReplicationSourceRequest)
+ returns(StartReplicationSourceResponse);
+}
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
index 5c480ba..08ac142 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
@@ -79,7 +79,7 @@ import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUti
* </pre>
*/
@InterfaceAudience.Private
-class ZKReplicationQueueStorage extends ZKReplicationStorageBase
+public class ZKReplicationQueueStorage extends ZKReplicationStorageBase
implements ReplicationQueueStorage {
private static final Logger LOG = LoggerFactory.getLogger(ZKReplicationQueueStorage.class);
@@ -121,7 +121,7 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase
return ZNodePaths.joinZNode(queuesZNode, serverName.getServerName());
}
- private String getQueueNode(ServerName serverName, String queueId) {
+ public String getQueueNode(ServerName serverName, String queueId) {
return ZNodePaths.joinZNode(getRsNode(serverName), queueId);
}
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java
index 596167f..a239bf8 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java
@@ -74,4 +74,8 @@ public abstract class ZKReplicationStorageBase {
throw new RuntimeException(e);
}
}
+
+ public ZKWatcher getZookeeper() {
+ return this.zookeeper;
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
index c677458..c17d699 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
@@ -3475,7 +3475,7 @@ public class MasterRpcServices extends RSRpcServices implements
if (master.getMasterCoprocessorHost() != null) {
master.getMasterCoprocessorHost().preListReplicationSinkServers();
}
- builder.addAllServerName(master.listReplicationSinkServers().stream()
+ builder.addAllServerName(master.getReplicationServerManager().getOnlineServersList().stream()
.map(ProtobufUtil::toServerName).collect(Collectors.toList()));
if (master.getMasterCoprocessorHost() != null) {
master.getMasterCoprocessorHost().postListReplicationSinkServers();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 72fea23..91bf9cb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -273,7 +273,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDe
@SuppressWarnings("deprecation")
public class RSRpcServices implements HBaseRPCErrorHandler,
AdminService.BlockingInterface, ClientService.BlockingInterface, PriorityFunction,
- ConfigurationObserver, ReplicationServerService.BlockingInterface {
+ ConfigurationObserver {
private static final Logger LOG = LoggerFactory.getLogger(RSRpcServices.class);
/** RPC scheduler to use for the region server. */
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
index 115df76..d17bb7f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
@@ -315,6 +315,10 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
if (!useZk || ReplicationUtils.isPeerClusterSupportReplicationOffload(conn)) {
useZk = false;
slaveAddresses = fetchSlavesAddresses();
+ if (slaveAddresses.isEmpty()) {
+ LOG.warn("No sinks available at peer. Try fetch sinks by using zk.");
+ useZk = true;
+ }
} else {
useZk = true;
}
@@ -322,13 +326,15 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
LOG.warn("Peer {} try to fetch servers by admin failed. Using zk impl.", ctx.getPeerId(), t);
useZk = true;
}
+
if (useZk) {
slaveAddresses = fetchSlavesAddressesByZK();
}
if (slaveAddresses.isEmpty()) {
- LOG.warn("No sinks available at peer. Will not be able to replicate");
+ LOG.warn("No sinks available at peer. Will not be able to replicate.");
}
+
Collections.shuffle(slaveAddresses, ThreadLocalRandom.current());
int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio);
synchronized (this) {
@@ -362,10 +368,10 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
}
private SinkPeer createSinkPeer(ServerName serverName) throws IOException {
- if (ReplicationUtils.isPeerClusterSupportReplicationOffload(conn)) {
- return new ReplicationServerSinkPeer(serverName, conn.getReplicationServerAdmin(serverName));
- } else {
+ if (fetchServersUseZk) {
return new RegionServerSinkPeer(serverName, conn.getRegionServerAdmin(serverName));
+ } else {
+ return new ReplicationServerSinkPeer(serverName, conn.getReplicationServerAdmin(serverName));
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
index e679a98..2d0336d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
@@ -20,10 +20,19 @@ package org.apache.hadoop.hbase.replication;
import java.io.IOException;
import java.lang.management.MemoryUsage;
import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.OptionalLong;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ChoreService;
+import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
@@ -33,17 +42,30 @@ import org.apache.hadoop.hbase.YouAreDeadException;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.regionserver.ReplicationService;
import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
+import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource;
+import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceFactory;
+import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
+import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceFactory;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
+import org.apache.hadoop.hbase.security.SecurityConstants;
+import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.Sleeper;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
+import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.ReflectionUtils;
@@ -65,7 +87,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerStatus
*/
@InterfaceAudience.Private
@SuppressWarnings({ "deprecation"})
-public class HReplicationServer extends Thread implements Server {
+public class HReplicationServer extends Thread implements Server, ReplicationSourceController {
private static final Logger LOG = LoggerFactory.getLogger(HReplicationServer.class);
@@ -75,7 +97,7 @@ public class HReplicationServer extends Thread implements Server {
/**
* This servers start code.
*/
- protected final long startCode;
+ private final long startCode;
private volatile boolean stopped = false;
@@ -84,7 +106,11 @@ public class HReplicationServer extends Thread implements Server {
private AtomicBoolean abortRequested;
// flag set after we're done setting up server threads
- final AtomicBoolean online = new AtomicBoolean(false);
+ private final AtomicBoolean online = new AtomicBoolean(false);
+
+ private final int msgInterval;
+ // A sleeper that sleeps for msgInterval.
+ private final Sleeper sleeper;
/**
* The server name the Master sees us as. Its made from the hostname the
@@ -93,18 +119,22 @@ public class HReplicationServer extends Thread implements Server {
*/
private ServerName serverName;
- protected final Configuration conf;
+ private final Configuration conf;
- private ReplicationSinkService replicationSinkService;
+ // zookeeper connection and watcher
+ private final ZKWatcher zooKeeper;
- final int msgInterval;
- // A sleeper that sleeps for msgInterval.
- protected final Sleeper sleeper;
+ private final UUID clusterId;
private final int shortOperationTimeout;
- // zookeeper connection and watcher
- protected final ZKWatcher zooKeeper;
+ private HFileSystem walFs;
+ private Path walRootDir;
+
+ /**
+ * ChoreService used to schedule tasks that we want to run periodically
+ */
+ private ChoreService choreService;
// master address tracker
private final MasterAddressTracker masterAddressTracker;
@@ -112,11 +142,23 @@ public class HReplicationServer extends Thread implements Server {
/**
* The asynchronous cluster connection to be shared by services.
*/
- protected AsyncClusterConnection asyncClusterConnection;
+ private AsyncClusterConnection asyncClusterConnection;
private UserProvider userProvider;
- protected final ReplicationServerRpcServices rpcServices;
+ final ReplicationServerRpcServices rpcServices;
+
+ // Total buffer size on this RegionServer for holding batched edits to be shipped.
+ private final long totalBufferLimit;
+ private AtomicLong totalBufferUsed = new AtomicLong();
+
+ private final MetricsReplicationGlobalSourceSource globalMetrics;
+ private final Map<String, MetricsSource> sourceMetrics = new HashMap<>();
+ private final ConcurrentMap<String, ReplicationSourceInterface> sources =
+ new ConcurrentHashMap<>();
+
+ private final ReplicationQueueStorage queueStorage;
+ private final ReplicationPeers replicationPeers;
// Stub to do region server status calls against the master.
private volatile ReplicationServerStatusService.BlockingInterface rssStub;
@@ -124,12 +166,9 @@ public class HReplicationServer extends Thread implements Server {
// RPC client. Used to make the stub above that does region server status checking.
private RpcClient rpcClient;
- /**
- * ChoreService used to schedule tasks that we want to run periodically
- */
- private ChoreService choreService;
+ private ReplicationSinkService replicationSinkService;
- public HReplicationServer(final Configuration conf) throws IOException {
+ public HReplicationServer(final Configuration conf) throws Exception {
try {
this.startCode = System.currentTimeMillis();
this.conf = conf;
@@ -142,12 +181,29 @@ public class HReplicationServer extends Thread implements Server {
serverName = ServerName.valueOf(hostName, this.rpcServices.isa.getPort(), this.startCode);
this.userProvider = UserProvider.instantiate(conf);
+ // login the zookeeper client principal (if using security)
+ ZKUtil.loginClient(this.conf, HConstants.ZK_CLIENT_KEYTAB_FILE,
+ HConstants.ZK_CLIENT_KERBEROS_PRINCIPAL, hostName);
+ // login the server principal (if using secure Hadoop)
+ this.userProvider.login(SecurityConstants.REGIONSERVER_KRB_KEYTAB_FILE,
+ SecurityConstants.REGIONSERVER_KRB_PRINCIPAL, hostName);
+ // init superusers and add the server principal (if using security)
+ // or process owner as default super user.
+ Superusers.initialize(conf);
this.msgInterval = conf.getInt("hbase.replicationserver.msginterval", 3 * 1000);
this.sleeper = new Sleeper(this.msgInterval, this);
this.shortOperationTimeout = conf.getInt(HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);
+ this.totalBufferLimit = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
+ HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
+ this.globalMetrics =
+ CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class)
+ .getGlobalSource();
+
+ initializeFileSystem();
+ this.choreService = new ChoreService(getName(), true);
// Some unit tests don't need a cluster, so no zookeeper at all
if (!conf.getBoolean("hbase.testing.nocluster", false)) {
@@ -160,6 +216,12 @@ public class HReplicationServer extends Thread implements Server {
zooKeeper = null;
masterAddressTracker = null;
}
+
+ this.queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zooKeeper, conf);
+ this.replicationPeers =
+ ReplicationFactory.getReplicationPeers(zooKeeper, this.conf);
+ this.replicationPeers.init();
+ this.clusterId = ZKClusterId.getUUIDForCluster(zooKeeper);
this.rpcServices.start(zooKeeper);
this.choreService = new ChoreService(getName(), true);
} catch (Throwable t) {
@@ -170,6 +232,15 @@ public class HReplicationServer extends Thread implements Server {
}
}
+ private void initializeFileSystem() throws IOException {
+ // Get fs instance used by this RS. Do we use checksum verification in the hbase? If hbase
+ // checksum verification enabled, then automatically switch off hdfs checksum verification.
+ boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
+ CommonFSUtils.setFsDefault(this.conf, CommonFSUtils.getWALRootDir(this.conf));
+ this.walFs = new HFileSystem(this.conf, useHBaseChecksum);
+ this.walRootDir = CommonFSUtils.getWALRootDir(this.conf);
+ }
+
public String getProcessName() {
return REPLICATION_SERVER;
}
@@ -289,6 +360,9 @@ public class HReplicationServer extends Thread implements Server {
if (this.replicationSinkService != null) {
this.replicationSinkService.stopReplicationService();
}
+ if (this.choreService != null) {
+ this.choreService.shutdown();
+ }
}
@Override
@@ -328,7 +402,7 @@ public class HReplicationServer extends Thread implements Server {
@Override
public ChoreService getChoreService() {
- return this.choreService;
+ return choreService;
}
@Override
@@ -592,4 +666,69 @@ public class HReplicationServer extends Thread implements Server {
}
return interrupted;
}
+
+ @Override
+ public long getTotalBufferLimit() {
+ return this.totalBufferLimit;
+ }
+
+ @Override
+ public AtomicLong getTotalBufferUsed() {
+ return this.totalBufferUsed;
+ }
+
+ @Override
+ public MetricsReplicationGlobalSourceSource getGlobalMetrics() {
+ return this.globalMetrics;
+ }
+
+ @Override
+ public void finishRecoveredSource(RecoveredReplicationSource src) {
+ this.sources.remove(src.getQueueId());
+ this.sourceMetrics.remove(src.getQueueId());
+ deleteQueue(src.getQueueId());
+ LOG.info("Finished recovering queue {} with the following stats: {}", src.getQueueId(),
+ src.getStats());
+ }
+
+ public void startReplicationSource(ServerName producer, String queueId)
+ throws IOException, ReplicationException {
+ ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(queueId);
+ String peerId = replicationQueueInfo.getPeerId();
+ this.replicationPeers.addPeer(peerId);
+ Path walDir =
+ new Path(walRootDir, AbstractFSWALProvider.getWALDirectoryName(producer.toString()));
+ MetricsSource metrics = new MetricsSource(queueId);
+
+ ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, queueId);
+ // init replication source
+ src.init(conf, walFs, walDir, this, queueStorage, replicationPeers.getPeer(peerId), this,
+ producer, queueId, clusterId, p -> OptionalLong.empty(), metrics);
+ queueStorage.getWALsInQueue(producer, queueId)
+ .forEach(walName -> src.enqueueLog(new Path(walDir, walName)));
+ src.startup();
+ sources.put(queueId, src);
+ sourceMetrics.put(queueId, metrics);
+ }
+
+ /**
+ * Delete a complete queue of wals associated with a replication source
+ * @param queueId the id of replication queue to delete
+ */
+ private void deleteQueue(String queueId) {
+ abortWhenFail(() -> this.queueStorage.removeQueue(getServerName(), queueId));
+ }
+
+ @FunctionalInterface
+ private interface ReplicationQueueOperation {
+ void exec() throws ReplicationException;
+ }
+
+ private void abortWhenFail(ReplicationQueueOperation op) {
+ try {
+ op.exec();
+ } catch (ReplicationException e) {
+ abort("Failed to operate on replication queue", e);
+ }
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationServerRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationServerRpcServices.java
index 15d4f8c..b8c3884 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationServerRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationServerRpcServices.java
@@ -56,11 +56,14 @@ import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerProtos.ReplicationServerService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerProtos.StartReplicationSourceRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerProtos.StartReplicationSourceResponse;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
@@ -321,4 +324,16 @@ public class ReplicationServerRpcServices implements HBaseRPCErrorHandler,
throw new ServiceException(ie);
}
}
+
+ @Override
+ public StartReplicationSourceResponse startReplicationSource(RpcController controller,
+ StartReplicationSourceRequest request) throws ServiceException {
+ try {
+ replicationServer.startReplicationSource(ProtobufUtil.toServerName(request.getServerName()),
+ request.getQueueId());
+ return StartReplicationSourceResponse.newBuilder().build();
+ } catch (Exception e) {
+ throw new ServiceException(e);
+ }
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
index 7cb159e..147556f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
@@ -51,10 +51,11 @@ public class RecoveredReplicationSource extends ReplicationSource {
@Override
public void init(Configuration conf, FileSystem fs, Path walDir,
ReplicationSourceController overallController, ReplicationQueueStorage queueStorage,
- ReplicationPeer replicationPeer, Server server, String peerClusterZnode, UUID clusterId,
- WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
- super.init(conf, fs, walDir, overallController, queueStorage, replicationPeer, server,
- peerClusterZnode, clusterId, walFileLengthProvider, metrics);
+ ReplicationPeer replicationPeer, Server server, ServerName producer, String queueId,
+ UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics)
+ throws IOException {
+ super.init(conf, fs, walDir, overallController, queueStorage, replicationPeer, server, producer,
+ queueId, clusterId, walFileLengthProvider, metrics);
this.actualPeerId = this.replicationQueueInfo.getPeerId();
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 27f2ce7..0d9ee4b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -62,10 +62,13 @@ import org.apache.hadoop.hbase.replication.ReplicationSourceController;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
+import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorage;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.zookeeper.ZKListener;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
@@ -149,6 +152,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
private int waitOnEndpointSeconds = -1;
private Thread initThread;
+ private Thread fetchWALsThread;
/**
* WALs to replicate.
@@ -186,8 +190,9 @@ public class ReplicationSource implements ReplicationSourceInterface {
@Override
public void init(Configuration conf, FileSystem fs, Path walDir,
ReplicationSourceController overallController, ReplicationQueueStorage queueStorage,
- ReplicationPeer replicationPeer, Server server, String queueId, UUID clusterId,
- WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
+ ReplicationPeer replicationPeer, Server server, ServerName producer, String queueId,
+ UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics)
+ throws IOException {
this.server = server;
this.conf = HBaseConfiguration.create(conf);
this.walDir = walDir;
@@ -219,6 +224,19 @@ public class ReplicationSource implements ReplicationSourceInterface {
this.abortOnError = this.conf.getBoolean("replication.source.regionserver.abort",
true);
+ if (conf.getBoolean(HConstants.REPLICATION_OFFLOAD_ENABLE_KEY,
+ HConstants.REPLICATION_OFFLOAD_ENABLE_DEFAULT)) {
+ if (queueStorage instanceof ZKReplicationQueueStorage) {
+ ZKReplicationQueueStorage zkQueueStorage = (ZKReplicationQueueStorage) queueStorage;
+ zkQueueStorage.getZookeeper().registerListener(
+ new ReplicationQueueListener(this, zkQueueStorage, producer, queueId, walDir));
+ LOG.info("Register a ZKListener to track the WALs from {}'s replication queue, queueId={}",
+ producer, queueId);
+ } else {
+ throw new UnsupportedOperationException(
+ "hbase.replication.offload.enabled=true only support ZKReplicationQueueStorage");
+ }
+ }
LOG.info("queueId={}, ReplicationSource: {}, currentBandwidth={}", queueId,
replicationPeer.getId(), this.currentBandwidth);
}
@@ -928,4 +946,36 @@ public class ReplicationSource implements ReplicationSourceInterface {
server.abort("Failed to operate on replication queue", e);
}
}
+
+ /**
+ * Tracks changes to the WALs in the replication queue.
+ */
+ public static class ReplicationQueueListener extends ZKListener {
+
+ private final ReplicationSource source;
+ private final String queueNode;
+ private final Path walDir;
+
+ public ReplicationQueueListener(ReplicationSource source,
+ ZKReplicationQueueStorage zkQueueStorage, ServerName producer, String queueId, Path walDir) {
+ super(zkQueueStorage.getZookeeper());
+ this.source = source;
+ this.queueNode = zkQueueStorage.getQueueNode(producer, queueId);
+ this.walDir = walDir;
+ }
+
+ @Override
+ public synchronized void nodeChildrenChanged(String path) {
+ if (path.equals(queueNode)) {
+ LOG.info("Detected change to the WALs in the replication queue {}", queueNode);
+ try {
+ ZKUtil.listChildrenNoWatch(watcher, queueNode).forEach(walName -> {
+ source.enqueueLog(new Path(walDir, walName));
+ });
+ } catch (KeeperException e) {
+ LOG.warn("Failed to read WALs in the replication queue {}", queueNode, e);
+ }
+ }
+ }
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
index 8863f14..56c8ee4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
@@ -35,7 +35,7 @@ public final class ReplicationSourceFactory {
private ReplicationSourceFactory() {}
- static ReplicationSourceInterface create(Configuration conf, String queueId) {
+ public static ReplicationSourceInterface create(Configuration conf, String queueId) {
ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(queueId);
boolean isQueueRecovered = replicationQueueInfo.isQueueRecovered();
ReplicationSourceInterface src;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index 296bd27..461276e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -50,6 +50,7 @@ public interface ReplicationSourceInterface {
* @param queueStorage the replication queue storage
* @param replicationPeer the replication peer
* @param server the server which start and run this replication source
+ * @param producer the name of region server which produce WAL to the replication queue
* @param queueId the id of our replication queue
* @param clusterId unique UUID for the cluster
* @param walFileLengthProvider used to get the WAL length
@@ -57,8 +58,9 @@ public interface ReplicationSourceInterface {
*/
void init(Configuration conf, FileSystem fs, Path walDir,
ReplicationSourceController overallController, ReplicationQueueStorage queueStorage,
- ReplicationPeer replicationPeer, Server server, String queueId, UUID clusterId,
- WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException;
+ ReplicationPeer replicationPeer, Server server, ServerName producer, String queueId,
+ UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics)
+ throws IOException;
/**
* Add a log to the list of logs to replicate
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index b6cb087..3dc2d12 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -304,8 +304,8 @@ public class ReplicationSourceManager implements ReplicationSourceController {
WALFileLengthProvider walFileLengthProvider =
this.walFactory.getWALProvider() != null?
this.walFactory.getWALProvider().getWALFileLengthProvider() : p -> OptionalLong.empty();
- src.init(conf, fs, logDir, this, queueStorage, replicationPeer, server, queueId, clusterId,
- walFileLengthProvider, new MetricsSource(queueId));
+ src.init(conf, fs, logDir, this, queueStorage, replicationPeer, server, server.getServerName(),
+ queueId, clusterId, walFileLengthProvider, new MetricsSource(queueId));
return src;
}
@@ -925,8 +925,9 @@ public class ReplicationSourceManager implements ReplicationSourceController {
CatalogReplicationSourcePeer peer = new CatalogReplicationSourcePeer(this.conf,
this.clusterId.toString());
final ReplicationSourceInterface crs = new CatalogReplicationSource();
- crs.init(conf, fs, logDir, this, new NoopReplicationQueueStorage(), peer, server, peer.getId(),
- clusterId, walProvider.getWALFileLengthProvider(), new MetricsSource(peer.getId()));
+ crs.init(conf, fs, logDir, this, new NoopReplicationQueueStorage(), peer, server,
+ server.getServerName(), peer.getId(), clusterId, walProvider.getWALFileLengthProvider(),
+ new MetricsSource(peer.getId()));
// Add listener on the provider so we can pick up the WAL to replicate on roll.
WALActionsListener listener = new WALActionsListener() {
@Override public void postLogRoll(Path oldPath, Path newPath) throws IOException {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index 8a32e94..8f28dee 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -48,8 +48,9 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
@Override
public void init(Configuration conf, FileSystem fs, Path walDir,
ReplicationSourceController overallController, ReplicationQueueStorage queueStorage,
- ReplicationPeer replicationPeer, Server server, String queueId, UUID clusterId,
- WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
+ ReplicationPeer replicationPeer, Server server, ServerName producer, String queueId,
+ UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics)
+ throws IOException {
this.queueId = queueId;
this.metrics = metrics;
this.walFileLengthProvider = walFileLengthProvider;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationFetchServers.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationFetchServers.java
index 9ceacee..db4152e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationFetchServers.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationFetchServers.java
@@ -18,10 +18,10 @@
package org.apache.hadoop.hbase.replication;
import static org.apache.hadoop.hbase.coprocessor.CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -38,13 +38,14 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-
-import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListReplicationSinkServersRequest;
@@ -53,11 +54,14 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterServ
@Category({ ReplicationTests.class, MediumTests.class })
public class TestReplicationFetchServers extends TestReplicationBase {
+ private static final Logger LOG = LoggerFactory.getLogger(TestReplicationFetchServers.class);
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestReplicationFetchServers.class);
+ private static HReplicationServer replicationServer;
+
private static AtomicBoolean fetchFlag = new AtomicBoolean(false);
public static class MyObserver implements MasterCoprocessor, MasterObserver {
@@ -77,6 +81,17 @@ public class TestReplicationFetchServers extends TestReplicationBase {
public static void setUpBeforeClass() throws Exception {
CONF2.set(MASTER_COPROCESSOR_CONF_KEY, MyObserver.class.getName());
TestReplicationBase.setUpBeforeClass();
+ replicationServer = new HReplicationServer(CONF2);
+ replicationServer.start();
+ UTIL2.waitFor(60000, () -> replicationServer.isOnline());
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TestReplicationBase.tearDownAfterClass();
+ if (!replicationServer.isStopped()) {
+ replicationServer.stop("test");
+ }
}
@Before
@@ -85,15 +100,23 @@ public class TestReplicationFetchServers extends TestReplicationBase {
}
@Test
- public void testMasterListReplicationPeerServers() throws IOException, ServiceException {
+ public void testMasterListReplicationPeerServers() throws IOException {
AsyncClusterConnection conn = UTIL2.getAsyncConnection();
ServerName master = UTIL2.getAdmin().getMaster();
- MasterService.BlockingInterface masterStub = MasterService.newBlockingStub(
- conn.getRpcClient().createBlockingRpcChannel(master, User.getCurrent(), 1000));
- ListReplicationSinkServersResponse resp = masterStub.listReplicationSinkServers(
- null, ListReplicationSinkServersRequest.newBuilder().build());
- List<ServerName> servers = ProtobufUtil.toServerNameList(resp.getServerNameList());
- assertFalse(servers.isEmpty());
+ // Wait for the replication server report to master
+ UTIL2.waitFor(60000, () -> {
+ List<ServerName> servers = new ArrayList<>();
+ try {
+ MasterService.BlockingInterface masterStub = MasterService.newBlockingStub(
+ conn.getRpcClient().createBlockingRpcChannel(master, User.getCurrent(), 1000));
+ ListReplicationSinkServersResponse resp = masterStub.listReplicationSinkServers(
+ null, ListReplicationSinkServersRequest.newBuilder().build());
+ servers = ProtobufUtil.toServerNameList(resp.getServerNameList());
+ } catch (Exception e) {
+ LOG.debug("Failed to list replication servers", e);
+ }
+ return servers.size() == 1;
+ });
assertTrue(fetchFlag.get());
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServerSink.java
similarity index 89%
rename from hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java
rename to hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServerSink.java
index 30660c6..d97667b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServerSink.java
@@ -43,7 +43,6 @@ import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.ReplicationServerManager;
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint.ReplicationServerSinkPeer;
-import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint.SinkPeer;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
@@ -64,13 +63,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Category({ReplicationTests.class, MediumTests.class})
-public class TestReplicationServer {
+public class TestReplicationServerSink {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestReplicationServer.class);
+ HBaseClassTestRule.forClass(TestReplicationServerSink.class);
- private static final Logger LOG = LoggerFactory.getLogger(TestReplicationServer.class);
+ private static final Logger LOG = LoggerFactory.getLogger(TestReplicationServerSink.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
@@ -126,6 +125,7 @@ public class TestReplicationServer {
if (!replicationServer.isStopped()) {
replicationServer.stop("test");
}
+ TEST_UTIL.waitFor(10000, () -> !replicationServer.isAlive());
replicationServer = null;
replicationServerName = null;
}
@@ -145,22 +145,7 @@ public class TestReplicationServer {
replicateWALEntryAndVerify(sinkPeer);
}
- /**
- * Requests region server using {@link AsyncReplicationServerAdmin}
- */
- @Test
- public void testReplicateWAL2() throws Exception {
- AsyncClusterConnection conn =
- TEST_UTIL.getHBaseCluster().getMaster().getAsyncClusterConnection();
- ServerName rs = TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().get(0)
- .getRegionServer().getServerName();
- AsyncReplicationServerAdmin replAdmin = conn.getReplicationServerAdmin(rs);
-
- ReplicationServerSinkPeer sinkPeer = new ReplicationServerSinkPeer(rs, replAdmin);
- replicateWALEntryAndVerify(sinkPeer);
- }
-
- private void replicateWALEntryAndVerify(SinkPeer sinkPeer) throws Exception {
+ private void replicateWALEntryAndVerify(ReplicationServerSinkPeer sinkPeer) throws Exception {
Entry[] entries = new Entry[BATCH_SIZE];
for(int i = 0; i < BATCH_SIZE; i++) {
entries[i] = generateEdit(i, TABLENAME, Bytes.toBytes(i));
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServerSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServerSource.java
new file mode 100644
index 0000000..843e5b1
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServerSource.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({ ReplicationTests.class, LargeTests.class })
+public class TestReplicationServerSource extends TestReplicationBase {
+
+ @ClassRule public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestReplicationServerSource.class);
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestReplicationServerSource.class);
+
+ private static HReplicationServer replicationServer;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ UTIL1.getConfiguration().setBoolean(HConstants.REPLICATION_OFFLOAD_ENABLE_KEY, true);
+ TestReplicationBase.setUpBeforeClass();
+ replicationServer = new HReplicationServer(UTIL1.getConfiguration());
+ replicationServer.start();
+ UTIL1.waitFor(60000, () -> replicationServer.isOnline());
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ replicationServer.stop("Tear down after test");
+ TestReplicationBase.tearDownAfterClass();
+ }
+
+ @Test
+ public void test() throws Exception {
+ try {
+ // Only start one region server in source cluster
+ ServerName producer = UTIL1.getMiniHBaseCluster().getRegionServer(0).getServerName();
+ replicationServer.startReplicationSource(producer, PEER_ID2);
+ } catch (Throwable e) {
+ LOG.info("Failed to start replicaiton source", e);
+ }
+ runSmallBatchTest();
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
index 697a5ec..bd673bc 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
@@ -139,7 +139,7 @@ public class TestReplicationSource {
String queueId = "qid";
RegionServerServices rss =
TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
- rs.init(conf, null, null, manager, null, mockPeer, rss, queueId, null,
+ rs.init(conf, null, null, manager, null, mockPeer, rss, rss.getServerName(), queueId, null,
p -> OptionalLong.empty(), new MetricsSource(queueId));
try {
rs.startup();
@@ -177,8 +177,8 @@ public class TestReplicationSource {
String queueId = "qid";
RegionServerServices rss =
TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
- rs.init(conf, null, null, manager, null, mockPeer, rss, queueId,
- uuid, p -> OptionalLong.empty(), new MetricsSource(queueId));
+ rs.init(conf, null, null, manager, null, mockPeer, rss, rss.getServerName(), queueId, uuid,
+ p -> OptionalLong.empty(), new MetricsSource(queueId));
try {
rs.startup();
TEST_UTIL.waitFor(30000, () -> rs.getWalEntryFilter() != null);
@@ -264,9 +264,9 @@ public class TestReplicationSource {
Configuration testConf = HBaseConfiguration.create();
testConf.setInt("replication.source.maxretriesmultiplier", 1);
ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
- Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
- source.init(testConf, null, null, manager, null, mockPeer, null, "testPeer",
- null, p -> OptionalLong.empty(), null);
+ Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
+ source.init(testConf, null, null, manager, null, mockPeer, null, null, "testPeer", null,
+ p -> OptionalLong.empty(), null);
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<?> future = executor.submit(
() -> source.terminate("testing source termination"));
@@ -289,7 +289,7 @@ public class TestReplicationSource {
ReplicationPeer mockPeer = mock(ReplicationPeer.class);
Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
Configuration testConf = HBaseConfiguration.create();
- source.init(testConf, null, null, mockManager, null, mockPeer, null,
+ source.init(testConf, null, null, mockManager, null, mockPeer, null, null,
"testPeer", null, p -> OptionalLong.empty(), mock(MetricsSource.class));
ReplicationSourceWALReader reader = new ReplicationSourceWALReader(null,
conf, null, 0, null, source, null);
@@ -315,7 +315,7 @@ public class TestReplicationSource {
reader.addEntryToBatch(batch, mockEntry);
reader.entryBatchQueue.put(batch);
source.terminate("test");
- assertEquals(0, source.controller.getTotalBufferUsed().get());
+ assertEquals(0, mockManager.getTotalBufferUsed().get());
}
/**
@@ -536,7 +536,7 @@ public class TestReplicationSource {
String queueId = "qid";
RegionServerServices rss =
TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
- rs.init(conf, null, null, manager, null, mockPeer, rss, queueId, null,
+ rs.init(conf, null, null, manager, null, mockPeer, rss, rss.getServerName(), queueId, null,
p -> OptionalLong.empty(), new MetricsSource(queueId));
return rss;
}
@@ -655,7 +655,7 @@ public class TestReplicationSource {
TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
ReplicationSource source = new ReplicationSource();
- source.init(conf, null, null, manager, null, mockPeer, rss, id, null,
+ source.init(conf, null, null, manager, null, mockPeer, rss, rss.getServerName(), id, null,
p -> OptionalLong.empty(), metrics);
final Path log1 = new Path(logDir, "log-walgroup-a.8");
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 44914a5..e6b745e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -414,7 +414,8 @@ public abstract class TestReplicationSourceManager {
assertEquals(files, manager.getWalsByIdRecoveredQueues().get(id).get(group));
ReplicationSourceInterface source = new ReplicationSource();
source.init(conf, fs, null, manager, manager.getQueueStorage(), rp1.getPeer("1"),
- manager.getServer(), id, null, p -> OptionalLong.empty(), null);
+ manager.getServer(), manager.getServer().getServerName(), id, null, p -> OptionalLong.empty(),
+ null);
source.cleanOldWALs(file2, false);
// log1 should be deleted
assertEquals(Sets.newHashSet(file2), manager.getWalsByIdRecoveredQueues().get(id).get(group));
@@ -630,16 +631,16 @@ public abstract class TestReplicationSourceManager {
ReplicationSourceInterface source = new ReplicationSource();
source.init(conf, fs, null, manager, manager.getQueueStorage(),
- mockReplicationPeerForSyncReplication(peerId2), manager.getServer(), peerId2, null,
- p -> OptionalLong.empty(), null);
+ mockReplicationPeerForSyncReplication(peerId2), manager.getServer(),
+ manager.getServer().getServerName(), peerId2, null, p -> OptionalLong.empty(), null);
source.cleanOldWALs(walName, true);
// still there if peer id does not match
assertTrue(fs.exists(remoteWAL));
source = new ReplicationSource();
source.init(conf, fs, null, manager, manager.getQueueStorage(),
- mockReplicationPeerForSyncReplication(slaveId), manager.getServer(), slaveId, null,
- p -> OptionalLong.empty(), null);
+ mockReplicationPeerForSyncReplication(slaveId), manager.getServer(),
+ manager.getServer().getServerName(), slaveId, null, p -> OptionalLong.empty(), null);
source.cleanOldWALs(walName, true);
assertFalse(fs.exists(remoteWAL));
} finally {
@@ -819,9 +820,10 @@ public abstract class TestReplicationSourceManager {
@Override
public void init(Configuration conf, FileSystem fs, Path walDir,
- ReplicationSourceController overallController, ReplicationQueueStorage rq, ReplicationPeer rp,
- Server server, String peerClusterId, UUID clusterId,
- WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
+ ReplicationSourceController overallController, ReplicationQueueStorage queueStorage,
+ ReplicationPeer replicationPeer, Server server, ServerName producer, String queueId,
+ UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics)
+ throws IOException {
throw new IOException("Failing deliberately");
}
}