You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/03/07 10:21:33 UTC
[32/50] [abbrv] hbase git commit: HBASE-19636 All rs should already
start work with the new peer change when replication peer procedure is
finished
HBASE-19636 All rs should already start work with the new peer change when replication peer procedure is finished
Signed-off-by: zhangduo <zh...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/00336214
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/00336214
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/00336214
Branch: refs/heads/HBASE-19397-branch-2
Commit: 00336214a7137ebe354f4f130ed73ea373e3eec9
Parents: 05cdee3
Author: Guanghao Zhang <zg...@apache.org>
Authored: Thu Jan 4 16:58:01 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Wed Mar 7 18:15:25 2018 +0800
----------------------------------------------------------------------
.../replication/ReplicationPeerConfig.java | 1 -
.../hbase/replication/ReplicationPeerImpl.java | 4 +-
.../hbase/replication/ReplicationQueueInfo.java | 23 +-
.../hbase/replication/ReplicationUtils.java | 56 ++
.../replication/TestReplicationStateZKImpl.java | 21 -
.../regionserver/ReplicationSourceService.java | 3 +-
.../regionserver/PeerProcedureHandler.java | 3 +
.../regionserver/PeerProcedureHandlerImpl.java | 50 +-
.../RecoveredReplicationSource.java | 6 +-
.../RecoveredReplicationSourceShipper.java | 8 +-
.../replication/regionserver/Replication.java | 11 +-
.../regionserver/ReplicationSource.java | 34 +-
.../regionserver/ReplicationSourceFactory.java | 4 +-
.../ReplicationSourceInterface.java | 8 +-
.../regionserver/ReplicationSourceManager.java | 827 ++++++++++---------
.../regionserver/ReplicationSourceShipper.java | 6 +-
.../ReplicationSourceWALReader.java | 2 +-
.../replication/ReplicationSourceDummy.java | 2 +-
.../replication/TestNamespaceReplication.java | 57 +-
.../TestReplicationSourceManager.java | 5 +-
20 files changed, 622 insertions(+), 509 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/00336214/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
index fdae288..bf8d030 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
@@ -25,7 +25,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
-
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
http://git-wip-us.apache.org/repos/asf/hbase/blob/00336214/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java
index 3e17025..604e0bb 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java
@@ -1,5 +1,4 @@
-/*
- *
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -28,6 +27,7 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public class ReplicationPeerImpl implements ReplicationPeer {
+
private final Configuration conf;
private final String id;
http://git-wip-us.apache.org/repos/asf/hbase/blob/00336214/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java
index ecd888f..cd65f9b 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java
@@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.ServerName;
/**
- * This class is responsible for the parsing logic for a znode representing a queue.
+ * This class is responsible for the parsing logic for a queue id representing a queue.
* It will extract the peerId if it's recovered as well as the dead region servers
* that were part of the queue's history.
*/
@@ -38,21 +38,20 @@ public class ReplicationQueueInfo {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationQueueInfo.class);
private final String peerId;
- private final String peerClusterZnode;
+ private final String queueId;
private boolean queueRecovered;
// List of all the dead region servers that had this queue (if recovered)
private List<ServerName> deadRegionServers = new ArrayList<>();
/**
- * The passed znode will be either the id of the peer cluster or
- * the handling story of that queue in the form of id-servername-*
+ * The passed queueId will be either the id of the peer or the handling story of that queue
+ * in the form of id-servername-*
*/
- public ReplicationQueueInfo(String znode) {
- this.peerClusterZnode = znode;
- String[] parts = znode.split("-", 2);
+ public ReplicationQueueInfo(String queueId) {
+ this.queueId = queueId;
+ String[] parts = queueId.split("-", 2);
this.queueRecovered = parts.length != 1;
- this.peerId = this.queueRecovered ?
- parts[0] : peerClusterZnode;
+ this.peerId = this.queueRecovered ? parts[0] : queueId;
if (parts.length >= 2) {
// extract dead servers
extractDeadServersFromZNodeString(parts[1], this.deadRegionServers);
@@ -60,7 +59,7 @@ public class ReplicationQueueInfo {
}
/**
- * Parse dead server names from znode string servername can contain "-" such as
+ * Parse dead server names from queue id. servername can contain "-" such as
* "ip-10-46-221-101.ec2.internal", so we need skip some "-" during parsing for the following
* cases: 2-ip-10-46-221-101.ec2.internal,52170,1364333181125-<server name>-...
*/
@@ -119,8 +118,8 @@ public class ReplicationQueueInfo {
return this.peerId;
}
- public String getPeerClusterZnode() {
- return this.peerClusterZnode;
+ public String getQueueId() {
+ return this.queueId;
}
public boolean isQueueRecovered() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/00336214/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
index be70e6e..ca871ea 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
@@ -18,12 +18,16 @@
package org.apache.hadoop.hbase.replication;
import java.io.IOException;
+import java.util.Collection;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompoundConfiguration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
/**
@@ -76,4 +80,56 @@ public final class ReplicationUtils {
queueStorage.removeReplicatorIfQueueIsEmpty(replicator);
}
}
+
+ private static boolean isCollectionEqual(Collection<String> c1, Collection<String> c2) {
+ if (c1 == null) {
+ return c2 == null;
+ }
+ if (c2 == null) {
+ return false;
+ }
+ return c1.size() == c2.size() && c1.containsAll(c2);
+ }
+
+ private static boolean isNamespacesEqual(Set<String> ns1, Set<String> ns2) {
+ return isCollectionEqual(ns1, ns2);
+ }
+
+ private static boolean isTableCFsEqual(Map<TableName, List<String>> tableCFs1,
+ Map<TableName, List<String>> tableCFs2) {
+ if (tableCFs1 == null) {
+ return tableCFs2 == null;
+ }
+ if (tableCFs2 == null) {
+ return false;
+ }
+ if (tableCFs1.size() != tableCFs2.size()) {
+ return false;
+ }
+ for (Map.Entry<TableName, List<String>> entry1 : tableCFs1.entrySet()) {
+ TableName table = entry1.getKey();
+ if (!tableCFs2.containsKey(table)) {
+ return false;
+ }
+ List<String> cfs1 = entry1.getValue();
+ List<String> cfs2 = tableCFs2.get(table);
+ if (!isCollectionEqual(cfs1, cfs2)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public static boolean isKeyConfigEqual(ReplicationPeerConfig rpc1, ReplicationPeerConfig rpc2) {
+ if (rpc1.replicateAllUserTables() != rpc2.replicateAllUserTables()) {
+ return false;
+ }
+ if (rpc1.replicateAllUserTables()) {
+ return isNamespacesEqual(rpc1.getExcludeNamespaces(), rpc2.getExcludeNamespaces()) &&
+ isTableCFsEqual(rpc1.getExcludeTableCFsMap(), rpc2.getExcludeTableCFsMap());
+ } else {
+ return isNamespacesEqual(rpc1.getNamespaces(), rpc2.getNamespaces()) &&
+ isTableCFsEqual(rpc1.getTableCFsMap(), rpc2.getTableCFsMap());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/00336214/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
index 1830103..08178f4 100644
--- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
+++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.replication;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.ClusterId;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseZKTestingUtility;
@@ -38,8 +37,6 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
@Category({ ReplicationTests.class, MediumTests.class })
public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
@@ -48,8 +45,6 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestReplicationStateZKImpl.class);
- private static final Logger LOG = LoggerFactory.getLogger(TestReplicationStateZKImpl.class);
-
private static Configuration conf;
private static HBaseZKTestingUtility utility;
private static ZKWatcher zkw;
@@ -97,20 +92,4 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
public static void tearDownAfterClass() throws Exception {
utility.shutdownMiniZKCluster();
}
-
- private static class WarnOnlyAbortable implements Abortable {
-
- @Override
- public void abort(String why, Throwable e) {
- LOG.warn("TestReplicationStateZKImpl received abort, ignoring. Reason: " + why);
- if (LOG.isDebugEnabled()) {
- LOG.debug(e.toString(), e);
- }
- }
-
- @Override
- public boolean isAborted() {
- return false;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/00336214/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java
index 977dd15..23ba773 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java
@@ -1,5 +1,4 @@
-/*
- *
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
http://git-wip-us.apache.org/repos/asf/hbase/blob/00336214/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java
index b392985..65da9af 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java
@@ -23,6 +23,9 @@ import java.io.IOException;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.yetus.audience.InterfaceAudience;
+/**
+ * A handler for modifying replication peer in peer procedures.
+ */
@InterfaceAudience.Private
public interface PeerProcedureHandler {
http://git-wip-us.apache.org/repos/asf/hbase/blob/00336214/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
index c09c6a0..ce8fdae 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
@@ -15,21 +15,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import java.util.concurrent.locks.Lock;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
+import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.util.KeyLocker;
import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
@InterfaceAudience.Private
public class PeerProcedureHandlerImpl implements PeerProcedureHandler {
- private static final Logger LOG = LoggerFactory.getLogger(PeerProcedureHandlerImpl.class);
private final ReplicationSourceManager replicationSourceManager;
private final KeyLocker<String> peersLock = new KeyLocker<>();
@@ -39,7 +38,7 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler {
}
@Override
- public void addPeer(String peerId) throws ReplicationException, IOException {
+ public void addPeer(String peerId) throws IOException {
Lock peerLock = peersLock.acquireLock(peerId);
try {
replicationSourceManager.addPeer(peerId);
@@ -49,7 +48,7 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler {
}
@Override
- public void removePeer(String peerId) throws ReplicationException, IOException {
+ public void removePeer(String peerId) throws IOException {
Lock peerLock = peersLock.acquireLock(peerId);
try {
if (replicationSourceManager.getReplicationPeers().getPeer(peerId) != null) {
@@ -60,35 +59,50 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler {
}
}
- @Override
- public void disablePeer(String peerId) throws ReplicationException, IOException {
+ private void refreshPeerState(String peerId) throws ReplicationException, IOException {
PeerState newState;
Lock peerLock = peersLock.acquireLock(peerId);
try {
+ ReplicationPeerImpl peer = replicationSourceManager.getReplicationPeers().getPeer(peerId);
+ if (peer == null) {
+ throw new ReplicationException("Peer with id=" + peerId + " is not cached.");
+ }
+ PeerState oldState = peer.getPeerState();
newState = replicationSourceManager.getReplicationPeers().refreshPeerState(peerId);
+ // RS need to start work with the new replication state change
+ if (oldState.equals(PeerState.ENABLED) && newState.equals(PeerState.DISABLED)) {
+ replicationSourceManager.refreshSources(peerId);
+ }
} finally {
peerLock.unlock();
}
- LOG.info("disable replication peer, id: {}, new state: {}", peerId, newState);
}
@Override
public void enablePeer(String peerId) throws ReplicationException, IOException {
- PeerState newState;
- Lock peerLock = peersLock.acquireLock(peerId);
- try {
- newState = replicationSourceManager.getReplicationPeers().refreshPeerState(peerId);
- } finally {
- peerLock.unlock();
- }
- LOG.info("enable replication peer, id: {}, new state: {}", peerId, newState);
+ refreshPeerState(peerId);
+ }
+
+ @Override
+ public void disablePeer(String peerId) throws ReplicationException, IOException {
+ refreshPeerState(peerId);
}
@Override
public void updatePeerConfig(String peerId) throws ReplicationException, IOException {
Lock peerLock = peersLock.acquireLock(peerId);
try {
- replicationSourceManager.getReplicationPeers().refreshPeerConfig(peerId);
+ ReplicationPeerImpl peer = replicationSourceManager.getReplicationPeers().getPeer(peerId);
+ if (peer == null) {
+ throw new ReplicationException("Peer with id=" + peerId + " is not cached.");
+ }
+ ReplicationPeerConfig oldConfig = peer.getPeerConfig();
+ ReplicationPeerConfig newConfig =
+ replicationSourceManager.getReplicationPeers().refreshPeerConfig(peerId);
+ // RS need to start work with the new replication config change
+ if (!ReplicationUtils.isKeyConfigEqual(oldConfig, newConfig)) {
+ replicationSourceManager.refreshSources(peerId);
+ }
} finally {
peerLock.unlock();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/00336214/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
index 7bceb78..1be9a88 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
@@ -81,7 +81,7 @@ public class RecoveredReplicationSource extends ReplicationSource {
ReplicationSourceWALReader walReader = new RecoveredReplicationSourceWALReader(fs,
conf, queue, startPosition, walEntryFilter, this);
Threads.setDaemonThreadRunning(walReader, threadName
- + ".replicationSource.replicationWALReaderThread." + walGroupId + "," + peerClusterZnode,
+ + ".replicationSource.replicationWALReaderThread." + walGroupId + "," + queueId,
getUncaughtExceptionHandler());
return walReader;
}
@@ -178,8 +178,8 @@ public class RecoveredReplicationSource extends ReplicationSource {
}
}
if (allTasksDone) {
- manager.closeRecoveredQueue(this);
- LOG.info("Finished recovering queue " + peerClusterZnode + " with the following stats: "
+ manager.removeRecoveredSource(this);
+ LOG.info("Finished recovering queue " + queueId + " with the following stats: "
+ getStats());
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/00336214/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
index 1c0bbee..38bbb48 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
@@ -76,7 +76,7 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper
shipEdits(entryBatch);
if (entryBatch.getWalEntries().isEmpty()) {
LOG.debug("Finished recovering queue for group " + walGroupId + " of peer "
- + source.getPeerClusterZnode());
+ + source.getQueueId());
source.getSourceMetrics().incrCompletedRecoveryQueue();
setWorkerState(WorkerState.FINISHED);
continue;
@@ -113,7 +113,7 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper
// normally has a position (unless the RS failed between 2 logs)
private long getRecoveredQueueStartPos() {
long startPosition = 0;
- String peerClusterZnode = source.getPeerClusterZnode();
+ String peerClusterZnode = source.getQueueId();
try {
startPosition = this.replicationQueues.getWALPosition(source.getServerWALsBelongTo(),
peerClusterZnode, this.queue.peek().getName());
@@ -129,8 +129,8 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper
@Override
protected void updateLogPosition(long lastReadPosition) {
- source.getSourceManager().logPositionAndCleanOldLogs(currentPath, source.getPeerClusterZnode(),
- lastReadPosition, true, false);
+ source.getSourceManager().logPositionAndCleanOldLogs(currentPath, source.getQueueId(),
+ lastReadPosition, true);
lastLoggedPosition = lastReadPosition;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/00336214/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index 5f8d0aa..6c46a85 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
-import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
@@ -187,11 +186,7 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
*/
@Override
public void startReplicationService() throws IOException {
- try {
- this.replicationManager.init();
- } catch (ReplicationException e) {
- throw new IOException(e);
- }
+ this.replicationManager.init();
this.replicationSink = new ReplicationSink(this.conf, this.server);
this.scheduleThreadPool.scheduleAtFixedRate(
new ReplicationStatisticsTask(this.replicationSink, this.replicationManager),
@@ -210,9 +205,9 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
throws IOException {
try {
this.replicationManager.addHFileRefs(tableName, family, pairs);
- } catch (ReplicationException e) {
+ } catch (IOException e) {
LOG.error("Failed to add hfile references in the replication queue.", e);
- throw new IOException(e);
+ throw e;
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/00336214/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index ffed88d..0092251 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -105,7 +105,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
// total number of edits we replicated
private AtomicLong totalReplicatedEdits = new AtomicLong(0);
// The znode we currently play with
- protected String peerClusterZnode;
+ protected String queueId;
// Maximum number of retries before taking bold actions
private int maxRetriesMultiplier;
// Indicates if this particular source is running
@@ -141,14 +141,14 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
* @param fs file system to use
* @param manager replication manager to ping to
* @param server the server for this region server
- * @param peerClusterZnode the name of our znode
+ * @param queueId the id of our replication queue
* @param clusterId unique UUID for the cluster
* @param metrics metrics for replication source
*/
@Override
public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
- String peerClusterZnode, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
+ String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
MetricsSource metrics) throws IOException {
this.server = server;
this.conf = HBaseConfiguration.create(conf);
@@ -167,8 +167,8 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
this.metrics = metrics;
this.clusterId = clusterId;
- this.peerClusterZnode = peerClusterZnode;
- this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
+ this.queueId = queueId;
+ this.replicationQueueInfo = new ReplicationQueueInfo(queueId);
// ReplicationQueueInfo parses the peerId out of the znode for us
this.peerId = this.replicationQueueInfo.getPeerId();
this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
@@ -178,7 +178,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0);
this.totalBufferUsed = manager.getTotalBufferUsed();
this.walFileLengthProvider = walFileLengthProvider;
- LOG.info("peerClusterZnode=" + peerClusterZnode + ", ReplicationSource : " + peerId
+ LOG.info("queueId=" + queueId + ", ReplicationSource : " + peerId
+ ", currentBandwidth=" + this.currentBandwidth);
}
@@ -216,12 +216,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
@Override
public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
throws ReplicationException {
- String peerId = peerClusterZnode;
- if (peerId.contains("-")) {
- // peerClusterZnode will be in the form peerId + "-" + rsZNode.
- // A peerId will not have "-" in its name, see HBASE-11394
- peerId = peerClusterZnode.split("-")[0];
- }
Map<TableName, List<String>> tableCFMap = replicationPeer.getTableCFs();
if (tableCFMap != null) {
List<String> tableCfs = tableCFMap.get(tableName);
@@ -310,7 +304,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId "
+ peerClusterId + " which is not allowed by ReplicationEndpoint:"
+ replicationEndpoint.getClass().getName(), null, false);
- this.manager.closeQueue(this);
+ this.manager.removeSource(this);
return;
}
LOG.info("Replicating " + clusterId + " -> " + peerClusterId);
@@ -355,7 +349,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
ReplicationSourceWALReader walReader =
new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this);
return (ReplicationSourceWALReader) Threads.setDaemonThreadRunning(walReader,
- threadName + ".replicationSource.wal-reader." + walGroupId + "," + peerClusterZnode,
+ threadName + ".replicationSource.wal-reader." + walGroupId + "," + queueId,
getUncaughtExceptionHandler());
}
@@ -449,7 +443,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
LOG.error("Unexpected exception in ReplicationSource", e);
}
};
- Threads.setDaemonThreadRunning(this, n + ".replicationSource," + this.peerClusterZnode,
+ Threads.setDaemonThreadRunning(this, n + ".replicationSource," + this.queueId,
handler);
}
@@ -465,9 +459,9 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
public void terminate(String reason, Exception cause, boolean join) {
if (cause == null) {
- LOG.info("Closing source " + this.peerClusterZnode + " because: " + reason);
+ LOG.info("Closing source " + this.queueId + " because: " + reason);
} else {
- LOG.error("Closing source " + this.peerClusterZnode + " because an error occurred: " + reason,
+ LOG.error("Closing source " + this.queueId + " because an error occurred: " + reason,
cause);
}
this.sourceRunning = false;
@@ -491,7 +485,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
.awaitTerminated(sleepForRetries * maxRetriesMultiplier, TimeUnit.MILLISECONDS);
} catch (TimeoutException te) {
LOG.warn("Got exception while waiting for endpoint to shutdown for replication source :"
- + this.peerClusterZnode,
+ + this.queueId,
te);
}
}
@@ -499,8 +493,8 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
}
@Override
- public String getPeerClusterZnode() {
- return this.peerClusterZnode;
+ public String getQueueId() {
+ return this.queueId;
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/00336214/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
index 865a202..93e8331 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
@@ -32,8 +32,8 @@ public class ReplicationSourceFactory {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceFactory.class);
- static ReplicationSourceInterface create(Configuration conf, String peerId) {
- ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
+ static ReplicationSourceInterface create(Configuration conf, String queueId) {
+ ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(queueId);
boolean isQueueRecovered = replicationQueueInfo.isQueueRecovered();
ReplicationSourceInterface src;
try {
http://git-wip-us.apache.org/repos/asf/hbase/blob/00336214/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index 4f10c73..d7cf9a3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -51,7 +51,7 @@ public interface ReplicationSourceInterface {
*/
void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
- String peerClusterZnode, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
+ String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
MetricsSource metrics) throws IOException;
/**
@@ -96,11 +96,11 @@ public interface ReplicationSourceInterface {
Path getCurrentPath();
/**
- * Get the id that the source is replicating to
+ * Get the queue id that the source is replicating to
*
- * @return peer cluster id
+ * @return queue id
*/
- String getPeerClusterZnode();
+ String getQueueId();
/**
* Get the id that the source is replicating to.
http://git-wip-us.apache.org/repos/asf/hbase/blob/00336214/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 968b3fb..6965f55 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -1,5 +1,4 @@
-/*
- *
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
@@ -32,7 +30,7 @@ import java.util.SortedSet;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
@@ -66,27 +64,53 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
- * This class is responsible to manage all the replication
- * sources. There are two classes of sources:
+ * This class is responsible to manage all the replication sources. There are two classes of
+ * sources:
* <ul>
- * <li> Normal sources are persistent and one per peer cluster</li>
- * <li> Old sources are recovered from a failed region server and our
- * only goal is to finish replicating the WAL queue it had up in ZK</li>
+ * <li>Normal sources are persistent and one per peer cluster</li>
+ * <li>Old sources are recovered from a failed region server and our only goal is to finish
+ * replicating the WAL queue it had</li>
+ * </ul>
+ * <p>
+ * When a region server dies, this class uses a watcher to get notified and it tries to grab a lock
+ * in order to transfer all the queues in a local old source.
+ * <p>
+ * Synchronization specification:
+ * <ul>
+ * <li>No need synchronized on {@link #sources}. {@link #sources} is a ConcurrentHashMap and there
+ * is a Lock for peer id in {@link PeerProcedureHandlerImpl}. So there is no race for peer
+ * operations.</li>
+ * <li>Need synchronized on {@link #walsById}. There are four methods which modify it,
+ * {@link #addPeer(String)}, {@link #removePeer(String)},
+ * {@link #cleanOldLogs(SortedSet, String, String)} and {@link #preLogRoll(Path)}. {@link #walsById}
+ * is a ConcurrentHashMap and there is a Lock for peer id in {@link PeerProcedureHandlerImpl}. So
+ * there is no race between {@link #addPeer(String)} and {@link #removePeer(String)}.
+ * {@link #cleanOldLogs(SortedSet, String, String)} is called by {@link ReplicationSourceInterface}.
+ * So no race with {@link #addPeer(String)}. {@link #removePeer(String)} will terminate the
+ * {@link ReplicationSourceInterface} firstly, then remove the wals from {@link #walsById}. So no
+ * race with {@link #removePeer(String)}. The only case need synchronized is
+ * {@link #cleanOldLogs(SortedSet, String, String)} and {@link #preLogRoll(Path)}.</li>
+ * <li>No need synchronized on {@link #walsByIdRecoveredQueues}. There are three methods which
+ * modify it, {@link #removePeer(String)} , {@link #cleanOldLogs(SortedSet, String, String)} and
+ * {@link ReplicationSourceManager.NodeFailoverWorker#run()}.
+ * {@link #cleanOldLogs(SortedSet, String, String)} is called by {@link ReplicationSourceInterface}.
+ * {@link #removePeer(String)} will terminate the {@link ReplicationSourceInterface} firstly, then
+ * remove the wals from {@link #walsByIdRecoveredQueues}. And
+ * {@link ReplicationSourceManager.NodeFailoverWorker#run()} will add the wals to
+ * {@link #walsByIdRecoveredQueues} firstly, then start up a {@link ReplicationSourceInterface}. So
+ * there is no race here. For {@link ReplicationSourceManager.NodeFailoverWorker#run()} and
+ * {@link #removePeer(String)}, there is already synchronized on {@link #oldsources}. So no need
+ * synchronized on {@link #walsByIdRecoveredQueues}.</li>
+ * <li>Need synchronized on {@link #latestPaths} to avoid the new open source miss new log.</li>
+ * <li>Need synchronized on {@link #oldsources} to avoid adding recovered source for the
+ * to-be-removed peer.</li>
* </ul>
- *
- * When a region server dies, this class uses a watcher to get notified and it
- * tries to grab a lock in order to transfer all the queues in a local
- * old source.
- *
- * This class implements the ReplicationListener interface so that it can track changes in
- * replication state.
*/
@InterfaceAudience.Private
public class ReplicationSourceManager implements ReplicationListener {
- private static final Logger LOG =
- LoggerFactory.getLogger(ReplicationSourceManager.class);
- // List of all the sources that read this RS's logs
- private final List<ReplicationSourceInterface> sources;
+ private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceManager.class);
+ // all the sources that read this RS's logs and every peer only has one replication source
+ private final ConcurrentMap<String, ReplicationSourceInterface> sources;
// List of all the sources we got from died RSs
private final List<ReplicationSourceInterface> oldsources;
private final ReplicationQueueStorage queueStorage;
@@ -96,11 +120,16 @@ public class ReplicationSourceManager implements ReplicationListener {
private final UUID clusterId;
// All about stopping
private final Server server;
+
// All logs we are currently tracking
- // Index structure of the map is: peer_id->logPrefix/logGroup->logs
- private final Map<String, Map<String, SortedSet<String>>> walsById;
+ // Index structure of the map is: queue_id->logPrefix/logGroup->logs
+ // For normal replication source, the peer id is same with the queue id
+ private final ConcurrentMap<String, Map<String, SortedSet<String>>> walsById;
// Logs for recovered sources we are currently tracking
- private final Map<String, Map<String, SortedSet<String>>> walsByIdRecoveredQueues;
+ // the map is: queue_id->logPrefix/logGroup->logs
+ // For recovered source, the queue id's format is peer_id-servername-*
+ private final ConcurrentMap<String, Map<String, SortedSet<String>>> walsByIdRecoveredQueues;
+
private final Configuration conf;
private final FileSystem fs;
// The paths to the latest log of each wal group, for new coming peers
@@ -136,22 +165,22 @@ public class ReplicationSourceManager implements ReplicationListener {
ReplicationPeers replicationPeers, ReplicationTracker replicationTracker, Configuration conf,
Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID clusterId,
WALFileLengthProvider walFileLengthProvider) throws IOException {
- //CopyOnWriteArrayList is thread-safe.
- //Generally, reading is more than modifying.
- this.sources = new CopyOnWriteArrayList<>();
+ // CopyOnWriteArrayList is thread-safe.
+ // Generally, reading is more than modifying.
+ this.sources = new ConcurrentHashMap<>();
this.queueStorage = queueStorage;
this.replicationPeers = replicationPeers;
this.replicationTracker = replicationTracker;
this.server = server;
- this.walsById = new HashMap<>();
+ this.walsById = new ConcurrentHashMap<>();
this.walsByIdRecoveredQueues = new ConcurrentHashMap<>();
- this.oldsources = new CopyOnWriteArrayList<>();
+ this.oldsources = new ArrayList<>();
this.conf = conf;
this.fs = fs;
this.logDir = logDir;
this.oldLogDir = oldLogDir;
- this.sleepBeforeFailover =
- conf.getLong("replication.sleep.before.failover", 30000); // 30 seconds
+ this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 30000); // 30
+ // seconds
this.clusterId = clusterId;
this.walFileLengthProvider = walFileLengthProvider;
this.replicationTracker.registerListener(this);
@@ -160,8 +189,8 @@ public class ReplicationSourceManager implements ReplicationListener {
int nbWorkers = conf.getInt("replication.executor.workers", 1);
// use a short 100ms sleep since this could be done inline with a RS startup
// even if we fail, other region servers can take care of it
- this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers,
- 100, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
+ this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers, 100, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>());
ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
tfb.setNameFormat("ReplicationExecutor-%d");
tfb.setDaemon(true);
@@ -171,74 +200,22 @@ public class ReplicationSourceManager implements ReplicationListener {
HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
}
- @FunctionalInterface
- private interface ReplicationQueueOperation {
- void exec() throws ReplicationException;
- }
-
- private void abortWhenFail(ReplicationQueueOperation op) {
- try {
- op.exec();
- } catch (ReplicationException e) {
- server.abort("Failed to operate on replication queue", e);
- }
- }
-
/**
- * Provide the id of the peer and a log key and this method will figure which
- * wal it belongs to and will log, for this region server, the current
- * position. It will also clean old logs from the queue.
- * @param log Path to the log currently being replicated from
- * replication status in zookeeper. It will also delete older entries.
- * @param id id of the peer cluster
- * @param position current location in the log
- * @param queueRecovered indicates if this queue comes from another region server
- * @param holdLogInZK if true then the log is retained in ZK
- */
- public void logPositionAndCleanOldLogs(Path log, String id, long position, boolean queueRecovered,
- boolean holdLogInZK) {
- String fileName = log.getName();
- abortWhenFail(
- () -> this.queueStorage.setWALPosition(server.getServerName(), id, fileName, position));
- if (holdLogInZK) {
- return;
- }
- cleanOldLogs(fileName, id, queueRecovered);
- }
-
- /**
- * Cleans a log file and all older files from ZK. Called when we are sure that a
- * log file is closed and has no more entries.
- * @param key Path to the log
- * @param id id of the peer cluster
- * @param queueRecovered Whether this is a recovered queue
+ * Adds a normal source per registered peer cluster and tries to process all old region server wal
+ * queues
+ * <p>
+ * The returned future is for adoptAbandonedQueues task.
*/
- public void cleanOldLogs(String key, String id, boolean queueRecovered) {
- String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(key);
- if (queueRecovered) {
- SortedSet<String> wals = walsByIdRecoveredQueues.get(id).get(logPrefix);
- if (wals != null && !wals.first().equals(key)) {
- cleanOldLogs(wals, key, id);
- }
- } else {
- synchronized (this.walsById) {
- SortedSet<String> wals = walsById.get(id).get(logPrefix);
- if (wals != null && !wals.first().equals(key)) {
- cleanOldLogs(wals, key, id);
- }
+ Future<?> init() throws IOException {
+ for (String id : this.replicationPeers.getAllPeerIds()) {
+ addSource(id);
+ if (replicationForBulkLoadDataEnabled) {
+ // Check if peer exists in hfile-refs queue, if not add it. This can happen in the case
+ // when a peer was added before replication for bulk loaded data was enabled.
+ throwIOExceptionWhenFail(() -> this.queueStorage.addPeerToHFileRefs(id));
}
}
- }
-
- private void cleanOldLogs(SortedSet<String> wals, String key, String id) {
- SortedSet<String> walSet = wals.headSet(key);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Removing " + walSet.size() + " logs in the list: " + walSet);
- }
- for (String wal : walSet) {
- abortWhenFail(() -> this.queueStorage.removeWAL(server.getServerName(), id, wal));
- }
- walSet.clear();
+ return this.executor.submit(this::adoptAbandonedQueues);
}
private void adoptAbandonedQueues() {
@@ -254,8 +231,8 @@ public class ReplicationSourceManager implements ReplicationListener {
}
List<ServerName> otherRegionServers = replicationTracker.getListOfRegionServers().stream()
.map(ServerName::valueOf).collect(Collectors.toList());
- LOG.info("Current list of replicators: " + currentReplicators + " other RSs: "
- + otherRegionServers);
+ LOG.info(
+ "Current list of replicators: " + currentReplicators + " other RSs: " + otherRegionServers);
// Look if there's anything to process after a restart
for (ServerName rs : currentReplicators) {
@@ -266,56 +243,112 @@ public class ReplicationSourceManager implements ReplicationListener {
}
/**
- * Adds a normal source per registered peer cluster and tries to process all old region server wal
- * queues
- * <p>
- * The returned future is for adoptAbandonedQueues task.
+ * 1. Add peer to replicationPeers 2. Add the normal source and related replication queue 3. Add
+ * HFile Refs
+ * @param peerId the id of replication peer
*/
- Future<?> init() throws IOException, ReplicationException {
- for (String id : this.replicationPeers.getAllPeerIds()) {
- addSource(id);
+ public void addPeer(String peerId) throws IOException {
+ boolean added = false;
+ try {
+ added = this.replicationPeers.addPeer(peerId);
+ } catch (ReplicationException e) {
+ throw new IOException(e);
+ }
+ if (added) {
+ addSource(peerId);
if (replicationForBulkLoadDataEnabled) {
- // Check if peer exists in hfile-refs queue, if not add it. This can happen in the case
- // when a peer was added before replication for bulk loaded data was enabled.
- this.queueStorage.addPeerToHFileRefs(id);
+ throwIOExceptionWhenFail(() -> this.queueStorage.addPeerToHFileRefs(peerId));
}
}
- return this.executor.submit(this::adoptAbandonedQueues);
}
/**
- * Add sources for the given peer cluster on this region server. For the newly added peer, we only
- * need to enqueue the latest log of each wal group and do replication
- * @param id the id of the peer cluster
+ * 1. Remove peer for replicationPeers 2. Remove all the recovered sources for the specified id
+ * and related replication queues 3. Remove the normal source and related replication queue 4.
+ * Remove HFile Refs
+ * @param peerId the id of the replication peer
+ */
+ public void removePeer(String peerId) {
+ replicationPeers.removePeer(peerId);
+ String terminateMessage = "Replication stream was removed by a user";
+ List<ReplicationSourceInterface> oldSourcesToDelete = new ArrayList<>();
+ // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
+ // see NodeFailoverWorker.run
+ synchronized (this.oldsources) {
+ // First close all the recovered sources for this peer
+ for (ReplicationSourceInterface src : oldsources) {
+ if (peerId.equals(src.getPeerId())) {
+ oldSourcesToDelete.add(src);
+ }
+ }
+ for (ReplicationSourceInterface src : oldSourcesToDelete) {
+ src.terminate(terminateMessage);
+ removeRecoveredSource(src);
+ }
+ }
+ LOG.info(
+ "Number of deleted recovered sources for " + peerId + ": " + oldSourcesToDelete.size());
+ // Now close the normal source for this peer
+ ReplicationSourceInterface srcToRemove = this.sources.get(peerId);
+ if (srcToRemove != null) {
+ srcToRemove.terminate(terminateMessage);
+ removeSource(srcToRemove);
+ } else {
+ // This only happened in unit test TestReplicationSourceManager#testPeerRemovalCleanup
+ // Delete queue from storage and memory and queue id is same with peer id for normal
+ // source
+ deleteQueue(peerId);
+ this.walsById.remove(peerId);
+ }
+
+ // Remove HFile Refs
+ abortWhenFail(() -> this.queueStorage.removePeerFromHFileRefs(peerId));
+ }
+
+ /**
+ * Factory method to create a replication source
+ * @param queueId the id of the replication queue
+ * @return the created source
+ */
+ private ReplicationSourceInterface createSource(String queueId, ReplicationPeer replicationPeer)
+ throws IOException {
+ ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, queueId);
+
+ MetricsSource metrics = new MetricsSource(queueId);
+ // init replication source
+ src.init(conf, fs, this, queueStorage, replicationPeer, server, queueId, clusterId,
+ walFileLengthProvider, metrics);
+ return src;
+ }
+
+ /**
+ * Add a normal source for the given peer on this region server. Meanwhile, add new replication
+ * queue to storage. For the newly added peer, we only need to enqueue the latest log of each wal
+ * group and do replication
+ * @param peerId the id of the replication peer
* @return the source that was created
*/
@VisibleForTesting
- ReplicationSourceInterface addSource(String id) throws IOException, ReplicationException {
- ReplicationPeer peer = replicationPeers.getPeer(id);
- ReplicationSourceInterface src = getReplicationSource(id, peer);
- synchronized (this.walsById) {
- this.sources.add(src);
+ ReplicationSourceInterface addSource(String peerId) throws IOException {
+ ReplicationPeer peer = replicationPeers.getPeer(peerId);
+ ReplicationSourceInterface src = createSource(peerId, peer);
+ // synchronized on latestPaths to avoid missing the new log
+ synchronized (this.latestPaths) {
+ this.sources.put(peerId, src);
Map<String, SortedSet<String>> walsByGroup = new HashMap<>();
- this.walsById.put(id, walsByGroup);
+ this.walsById.put(peerId, walsByGroup);
// Add the latest wal to that source's queue
- synchronized (latestPaths) {
- if (this.latestPaths.size() > 0) {
- for (Path logPath : latestPaths) {
- String name = logPath.getName();
- String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(name);
- SortedSet<String> logs = new TreeSet<>();
- logs.add(name);
- walsByGroup.put(walPrefix, logs);
- try {
- this.queueStorage.addWAL(server.getServerName(), id, name);
- } catch (ReplicationException e) {
- String message = "Cannot add log to queue when creating a new source, queueId=" + id +
- ", filename=" + name;
- server.stop(message);
- throw e;
- }
- src.enqueueLog(logPath);
- }
+ if (this.latestPaths.size() > 0) {
+ for (Path logPath : latestPaths) {
+ String name = logPath.getName();
+ String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(name);
+ SortedSet<String> logs = new TreeSet<>();
+ logs.add(name);
+ walsByGroup.put(walPrefix, logs);
+ // Abort RS and throw exception to make add peer failed
+ abortAndThrowIOExceptionWhenFail(
+ () -> this.queueStorage.addWAL(server.getServerName(), peerId, name));
+ src.enqueueLog(logPath);
}
}
}
@@ -323,89 +356,219 @@ public class ReplicationSourceManager implements ReplicationListener {
return src;
}
- @VisibleForTesting
- int getSizeOfLatestPath() {
- synchronized (latestPaths) {
- return latestPaths.size();
- }
- }
-
/**
- * Delete a complete queue of wals associated with a peer cluster
- * @param peerId Id of the peer cluster queue of wals to delete
+ * Close the previous replication sources of this peer id and open new sources to trigger the new
+ * replication state changes or new replication config changes. Here we don't need to change
+ * replication queue storage and only to enqueue all logs to the new replication source
+ * @param peerId the id of the replication peer
+ * @throws IOException
*/
- public void deleteSource(String peerId, boolean closeConnection) {
- abortWhenFail(() -> this.queueStorage.removeQueue(server.getServerName(), peerId));
- if (closeConnection) {
- this.replicationPeers.removePeer(peerId);
+ public void refreshSources(String peerId) throws IOException {
+ String terminateMessage = "Peer " + peerId +
+ " state or config changed. Will close the previous replication source and open a new one";
+ ReplicationPeer peer = replicationPeers.getPeer(peerId);
+ ReplicationSourceInterface src = createSource(peerId, peer);
+ // synchronized on latestPaths to avoid missing the new log
+ synchronized (this.latestPaths) {
+ ReplicationSourceInterface toRemove = this.sources.put(peerId, src);
+ if (toRemove != null) {
+ LOG.info("Terminate replication source for " + toRemove.getPeerId());
+ toRemove.terminate(terminateMessage);
+ }
+ for (SortedSet<String> walsByGroup : walsById.get(peerId).values()) {
+ walsByGroup.forEach(wal -> src.enqueueLog(new Path(this.logDir, wal)));
+ }
}
- }
+ LOG.info("Startup replication source for " + src.getPeerId());
+ src.startup();
- /**
- * Terminate the replication on this region server
- */
- public void join() {
- this.executor.shutdown();
- for (ReplicationSourceInterface source : this.sources) {
- source.terminate("Region server is closing");
+ List<ReplicationSourceInterface> toStartup = new ArrayList<>();
+ // synchronized on oldsources to avoid race with NodeFailoverWorker
+ synchronized (this.oldsources) {
+ List<String> previousQueueIds = new ArrayList<>();
+ for (ReplicationSourceInterface oldSource : this.oldsources) {
+ if (oldSource.getPeerId().equals(peerId)) {
+ previousQueueIds.add(oldSource.getQueueId());
+ oldSource.terminate(terminateMessage);
+ this.oldsources.remove(oldSource);
+ }
+ }
+ for (String queueId : previousQueueIds) {
+ ReplicationSourceInterface replicationSource = createSource(queueId, peer);
+ this.oldsources.add(replicationSource);
+ for (SortedSet<String> walsByGroup : walsByIdRecoveredQueues.get(queueId).values()) {
+ walsByGroup.forEach(wal -> src.enqueueLog(new Path(wal)));
+ }
+ toStartup.add(replicationSource);
+ }
+ }
+ for (ReplicationSourceInterface replicationSource : oldsources) {
+ replicationSource.startup();
}
}
/**
- * Get a copy of the wals of the first source on this rs
- * @return a sorted set of wal names
+ * Clear the metrics and related replication queue of the specified old source
+ * @param src source to clear
*/
- @VisibleForTesting
- Map<String, Map<String, SortedSet<String>>> getWALs() {
- return Collections.unmodifiableMap(walsById);
+ void removeRecoveredSource(ReplicationSourceInterface src) {
+ LOG.info("Done with the recovered queue " + src.getQueueId());
+ src.getSourceMetrics().clear();
+ this.oldsources.remove(src);
+ // Delete queue from storage and memory
+ deleteQueue(src.getQueueId());
+ this.walsByIdRecoveredQueues.remove(src.getQueueId());
}
/**
- * Get a copy of the wals of the recovered sources on this rs
- * @return a sorted set of wal names
+ * Clear the metrics and related replication queue of the specified old source
+ * @param src source to clear
*/
- @VisibleForTesting
- Map<String, Map<String, SortedSet<String>>> getWalsByIdRecoveredQueues() {
- return Collections.unmodifiableMap(walsByIdRecoveredQueues);
+ void removeSource(ReplicationSourceInterface src) {
+ LOG.info("Done with the queue " + src.getQueueId());
+ src.getSourceMetrics().clear();
+ this.sources.remove(src.getPeerId());
+ // Delete queue from storage and memory
+ deleteQueue(src.getQueueId());
+ this.walsById.remove(src.getQueueId());
}
/**
- * Get a list of all the normal sources of this rs
- * @return lis of all sources
+ * Delete a complete queue of wals associated with a replication source
+ * @param queueId the id of replication queue to delete
*/
- public List<ReplicationSourceInterface> getSources() {
- return this.sources;
+ private void deleteQueue(String queueId) {
+ abortWhenFail(() -> this.queueStorage.removeQueue(server.getServerName(), queueId));
+ }
+
+ @FunctionalInterface
+ private interface ReplicationQueueOperation {
+ void exec() throws ReplicationException;
+ }
+
+ private void abortWhenFail(ReplicationQueueOperation op) {
+ try {
+ op.exec();
+ } catch (ReplicationException e) {
+ server.abort("Failed to operate on replication queue", e);
+ }
+ }
+
+ private void throwIOExceptionWhenFail(ReplicationQueueOperation op) throws IOException {
+ try {
+ op.exec();
+ } catch (ReplicationException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private void abortAndThrowIOExceptionWhenFail(ReplicationQueueOperation op) throws IOException {
+ try {
+ op.exec();
+ } catch (ReplicationException e) {
+ server.abort("Failed to operate on replication queue", e);
+ throw new IOException(e);
+ }
}
/**
- * Get a list of all the old sources of this rs
- * @return list of all old sources
+ * This method will log the current position to storage. And also clean old logs from the
+ * replication queue.
+ * @param log Path to the log currently being replicated
+ * @param queueId id of the replication queue
+ * @param position current location in the log
+ * @param queueRecovered indicates if this queue comes from another region server
*/
- public List<ReplicationSourceInterface> getOldSources() {
- return this.oldsources;
+ public void logPositionAndCleanOldLogs(Path log, String queueId, long position,
+ boolean queueRecovered) {
+ String fileName = log.getName();
+ abortWhenFail(
+ () -> this.queueStorage.setWALPosition(server.getServerName(), queueId, fileName, position));
+ cleanOldLogs(fileName, queueId, queueRecovered);
}
/**
- * Get the normal source for a given peer
- * @param peerId
- * @return the normal source for the give peer if it exists, otherwise null.
+ * Cleans a log file and all older logs from replication queue. Called when we are sure that a log
+ * file is closed and has no more entries.
+ * @param log Path to the log
+ * @param queueId id of the replication queue
+ * @param queueRecovered Whether this is a recovered queue
*/
- public ReplicationSourceInterface getSource(String peerId) {
- return getSources().stream().filter(s -> s.getPeerId().equals(peerId)).findFirst().orElse(null);
+ @VisibleForTesting
+ void cleanOldLogs(String log, String queueId, boolean queueRecovered) {
+ String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log);
+ if (queueRecovered) {
+ SortedSet<String> wals = walsByIdRecoveredQueues.get(queueId).get(logPrefix);
+ if (wals != null && !wals.first().equals(log)) {
+ cleanOldLogs(wals, log, queueId);
+ }
+ } else {
+ // synchronized on walsById to avoid race with preLogRoll
+ synchronized (this.walsById) {
+ SortedSet<String> wals = walsById.get(queueId).get(logPrefix);
+ if (wals != null && !wals.first().equals(log)) {
+ cleanOldLogs(wals, log, queueId);
+ }
+ }
+ }
}
- @VisibleForTesting
- List<String> getAllQueues() throws ReplicationException {
- return queueStorage.getAllQueues(server.getServerName());
+ private void cleanOldLogs(SortedSet<String> wals, String key, String id) {
+ SortedSet<String> walSet = wals.headSet(key);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Removing " + walSet.size() + " logs in the list: " + walSet);
+ }
+ for (String wal : walSet) {
+ abortWhenFail(() -> this.queueStorage.removeWAL(server.getServerName(), id, wal));
+ }
+ walSet.clear();
}
// public because of we call it in TestReplicationEmptyWALRecovery
@VisibleForTesting
public void preLogRoll(Path newLog) throws IOException {
- recordLog(newLog);
String logName = newLog.getName();
String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName);
- synchronized (latestPaths) {
+ // synchronized on latestPaths to avoid the new open source miss the new log
+ synchronized (this.latestPaths) {
+ // Add log to queue storage
+ for (ReplicationSourceInterface source : this.sources.values()) {
+ // If record log to queue storage failed, abort RS and throw exception to make log roll
+ // failed
+ abortAndThrowIOExceptionWhenFail(
+ () -> this.queueStorage.addWAL(server.getServerName(), source.getQueueId(), logName));
+ }
+
+ // synchronized on walsById to avoid race with cleanOldLogs
+ synchronized (this.walsById) {
+ // Update walsById map
+ for (Map.Entry<String, Map<String, SortedSet<String>>> entry : this.walsById.entrySet()) {
+ String peerId = entry.getKey();
+ Map<String, SortedSet<String>> walsByPrefix = entry.getValue();
+ boolean existingPrefix = false;
+ for (Map.Entry<String, SortedSet<String>> walsEntry : walsByPrefix.entrySet()) {
+ SortedSet<String> wals = walsEntry.getValue();
+ if (this.sources.isEmpty()) {
+ // If there's no slaves, don't need to keep the old wals since
+ // we only consider the last one when a new slave comes in
+ wals.clear();
+ }
+ if (logPrefix.equals(walsEntry.getKey())) {
+ wals.add(logName);
+ existingPrefix = true;
+ }
+ }
+ if (!existingPrefix) {
+ // The new log belongs to a new group, add it into this peer
+ LOG.debug("Start tracking logs for wal group " + logPrefix + " for peer " + peerId);
+ SortedSet<String> wals = new TreeSet<>();
+ wals.add(logName);
+ walsByPrefix.put(logPrefix, wals);
+ }
+ }
+ }
+
+ // Add to latestPaths
Iterator<Path> iterator = latestPaths.iterator();
while (iterator.hasNext()) {
Path path = iterator.next();
@@ -418,89 +581,23 @@ public class ReplicationSourceManager implements ReplicationListener {
}
}
- /**
- * Check and enqueue the given log to the correct source. If there's still no source for the
- * group to which the given log belongs, create one
- * @param logPath the log path to check and enqueue
- * @throws IOException
- */
- private void recordLog(Path logPath) throws IOException {
- String logName = logPath.getName();
- String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName);
- // update replication queues on ZK
- // synchronize on replicationPeers to avoid adding source for the to-be-removed peer
- synchronized (replicationPeers) {
- for (String id : replicationPeers.getAllPeerIds()) {
- try {
- this.queueStorage.addWAL(server.getServerName(), id, logName);
- } catch (ReplicationException e) {
- throw new IOException("Cannot add log to replication queue"
- + " when creating a new source, queueId=" + id + ", filename=" + logName, e);
- }
- }
- }
- // update walsById map
- synchronized (walsById) {
- for (Map.Entry<String, Map<String, SortedSet<String>>> entry : this.walsById.entrySet()) {
- String peerId = entry.getKey();
- Map<String, SortedSet<String>> walsByPrefix = entry.getValue();
- boolean existingPrefix = false;
- for (Map.Entry<String, SortedSet<String>> walsEntry : walsByPrefix.entrySet()) {
- SortedSet<String> wals = walsEntry.getValue();
- if (this.sources.isEmpty()) {
- // If there's no slaves, don't need to keep the old wals since
- // we only consider the last one when a new slave comes in
- wals.clear();
- }
- if (logPrefix.equals(walsEntry.getKey())) {
- wals.add(logName);
- existingPrefix = true;
- }
- }
- if (!existingPrefix) {
- // The new log belongs to a new group, add it into this peer
- LOG.debug("Start tracking logs for wal group " + logPrefix + " for peer " + peerId);
- SortedSet<String> wals = new TreeSet<>();
- wals.add(logName);
- walsByPrefix.put(logPrefix, wals);
- }
- }
- }
- }
-
// public because of we call it in TestReplicationEmptyWALRecovery
@VisibleForTesting
public void postLogRoll(Path newLog) throws IOException {
// This only updates the sources we own, not the recovered ones
- for (ReplicationSourceInterface source : this.sources) {
+ for (ReplicationSourceInterface source : this.sources.values()) {
source.enqueueLog(newLog);
}
}
- @VisibleForTesting
- public AtomicLong getTotalBufferUsed() {
- return totalBufferUsed;
- }
-
- /**
- * Factory method to create a replication source
- * @param peerId the id of the peer cluster
- * @return the created source
- */
- private ReplicationSourceInterface getReplicationSource(String peerId,
- ReplicationPeer replicationPeer) throws IOException {
- ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, peerId);
-
- MetricsSource metrics = new MetricsSource(peerId);
- // init replication source
- src.init(conf, fs, this, queueStorage, replicationPeer, server, peerId, clusterId,
- walFileLengthProvider, metrics);
- return src;
+ @Override
+ public void regionServerRemoved(String regionserver) {
+ transferQueues(ServerName.valueOf(regionserver));
}
/**
* Transfer all the queues of the specified to this region server. First it tries to grab a lock
- * and if it works it will move the znodes and finally will delete the old znodes.
+ * and if it works it will move the old queues and finally will delete the old queues.
* <p>
* It creates one old source for any type of source of the old rs.
*/
@@ -518,102 +615,8 @@ public class ReplicationSourceManager implements ReplicationListener {
}
/**
- * Clear the references to the specified old source
- * @param src source to clear
- */
- public void closeRecoveredQueue(ReplicationSourceInterface src) {
- LOG.info("Done with the recovered queue " + src.getPeerClusterZnode());
- if (src instanceof ReplicationSource) {
- ((ReplicationSource) src).getSourceMetrics().clear();
- }
- this.oldsources.remove(src);
- deleteSource(src.getPeerClusterZnode(), false);
- this.walsByIdRecoveredQueues.remove(src.getPeerClusterZnode());
- }
-
- /**
- * Clear the references to the specified old source
- * @param src source to clear
- */
- public void closeQueue(ReplicationSourceInterface src) {
- LOG.info("Done with the queue " + src.getPeerClusterZnode());
- src.getSourceMetrics().clear();
- this.sources.remove(src);
- deleteSource(src.getPeerClusterZnode(), true);
- this.walsById.remove(src.getPeerClusterZnode());
- }
-
- public void addPeer(String id) throws ReplicationException, IOException {
- LOG.info("Trying to add peer, peerId: " + id);
- boolean added = this.replicationPeers.addPeer(id);
- if (added) {
- LOG.info("Peer " + id + " connected success, trying to start the replication source thread.");
- addSource(id);
- if (replicationForBulkLoadDataEnabled) {
- this.queueStorage.addPeerToHFileRefs(id);
- }
- }
- }
-
- /**
- * Thie method first deletes all the recovered sources for the specified
- * id, then deletes the normal source (deleting all related data in ZK).
- * @param id The id of the peer cluster
- */
- public void removePeer(String id) {
- LOG.info("Closing the following queue " + id + ", currently have "
- + sources.size() + " and another "
- + oldsources.size() + " that were recovered");
- String terminateMessage = "Replication stream was removed by a user";
- List<ReplicationSourceInterface> oldSourcesToDelete = new ArrayList<>();
- // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
- // see NodeFailoverWorker.run
- synchronized (oldsources) {
- // First close all the recovered sources for this peer
- for (ReplicationSourceInterface src : oldsources) {
- if (id.equals(src.getPeerId())) {
- oldSourcesToDelete.add(src);
- }
- }
- for (ReplicationSourceInterface src : oldSourcesToDelete) {
- src.terminate(terminateMessage);
- closeRecoveredQueue(src);
- }
- }
- LOG.info("Number of deleted recovered sources for " + id + ": "
- + oldSourcesToDelete.size());
- // Now look for the one on this cluster
- List<ReplicationSourceInterface> srcToRemove = new ArrayList<>();
- // synchronize on replicationPeers to avoid adding source for the to-be-removed peer
- synchronized (this.replicationPeers) {
- for (ReplicationSourceInterface src : this.sources) {
- if (id.equals(src.getPeerId())) {
- srcToRemove.add(src);
- }
- }
- if (srcToRemove.isEmpty()) {
- LOG.error("The peer we wanted to remove is missing a ReplicationSourceInterface. " +
- "This could mean that ReplicationSourceInterface initialization failed for this peer " +
- "and that replication on this peer may not be caught up. peerId=" + id);
- }
- for (ReplicationSourceInterface toRemove : srcToRemove) {
- toRemove.terminate(terminateMessage);
- closeQueue(toRemove);
- }
- deleteSource(id, true);
- }
- // Remove HFile Refs znode from zookeeper
- abortWhenFail(() -> this.queueStorage.removePeerFromHFileRefs(id));
- }
-
- @Override
- public void regionServerRemoved(String regionserver) {
- transferQueues(ServerName.valueOf(regionserver));
- }
-
- /**
- * Class responsible to setup new ReplicationSources to take care of the
- * queues from dead region servers.
+ * Class responsible to setup new ReplicationSources to take care of the queues from dead region
+ * servers.
*/
class NodeFailoverWorker extends Thread {
@@ -643,10 +646,10 @@ public class ReplicationSourceManager implements ReplicationListener {
}
Map<String, Set<String>> newQueues = new HashMap<>();
try {
- List<String> peers = queueStorage.getAllQueues(deadRS);
- while (!peers.isEmpty()) {
+ List<String> queues = queueStorage.getAllQueues(deadRS);
+ while (!queues.isEmpty()) {
Pair<String, SortedSet<String>> peer = queueStorage.claimQueue(deadRS,
- peers.get(ThreadLocalRandom.current().nextInt(peers.size())), server.getServerName());
+ queues.get(ThreadLocalRandom.current().nextInt(queues.size())), server.getServerName());
long sleep = sleepBeforeFailover / 2;
if (!peer.getSecond().isEmpty()) {
newQueues.put(peer.getFirst(), peer.getSecond());
@@ -658,9 +661,9 @@ public class ReplicationSourceManager implements ReplicationListener {
LOG.warn("Interrupted while waiting before transferring a queue.");
Thread.currentThread().interrupt();
}
- peers = queueStorage.getAllQueues(deadRS);
+ queues = queueStorage.getAllQueues(deadRS);
}
- if (!peers.isEmpty()) {
+ if (queues.isEmpty()) {
queueStorage.removeReplicatorIfQueueIsEmpty(deadRS);
}
} catch (ReplicationException e) {
@@ -675,18 +678,18 @@ public class ReplicationSourceManager implements ReplicationListener {
}
for (Map.Entry<String, Set<String>> entry : newQueues.entrySet()) {
- String peerId = entry.getKey();
+ String queueId = entry.getKey();
Set<String> walsSet = entry.getValue();
try {
// there is not an actual peer defined corresponding to peerId for the failover.
- ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
+ ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(queueId);
String actualPeerId = replicationQueueInfo.getPeerId();
ReplicationPeer peer = replicationPeers.getPeer(actualPeerId);
if (peer == null) {
- LOG.warn("Skipping failover for peer:" + actualPeerId + " of node " + deadRS
- + ", peer is null");
- abortWhenFail(() -> queueStorage.removeQueue(server.getServerName(), peerId));
+ LOG.warn("Skipping failover for peer:" + actualPeerId + " of node " + deadRS +
+ ", peer is null");
+ abortWhenFail(() -> queueStorage.removeQueue(server.getServerName(), queueId));
continue;
}
if (server instanceof ReplicationSyncUp.DummyServer
@@ -698,7 +701,7 @@ public class ReplicationSourceManager implements ReplicationListener {
}
// track sources in walsByIdRecoveredQueues
Map<String, SortedSet<String>> walsByGroup = new HashMap<>();
- walsByIdRecoveredQueues.put(peerId, walsByGroup);
+ walsByIdRecoveredQueues.put(queueId, walsByGroup);
for (String wal : walsSet) {
String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
SortedSet<String> wals = walsByGroup.get(walPrefix);
@@ -709,14 +712,12 @@ public class ReplicationSourceManager implements ReplicationListener {
wals.add(wal);
}
- // enqueue sources
- ReplicationSourceInterface src = getReplicationSource(peerId, peer);
+ ReplicationSourceInterface src = createSource(queueId, peer);
// synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
- // see removePeer
synchronized (oldsources) {
if (!replicationPeers.getAllPeerIds().contains(src.getPeerId())) {
src.terminate("Recovered queue doesn't belong to any current peer");
- closeRecoveredQueue(src);
+ removeRecoveredSource(src);
continue;
}
oldsources.add(src);
@@ -734,6 +735,82 @@ public class ReplicationSourceManager implements ReplicationListener {
}
/**
+ * Terminate the replication on this region server
+ */
+ public void join() {
+ this.executor.shutdown();
+ for (ReplicationSourceInterface source : this.sources.values()) {
+ source.terminate("Region server is closing");
+ }
+ }
+
+ /**
+ * Get a copy of the wals of the normal sources on this rs
+ * @return a sorted set of wal names
+ */
+ @VisibleForTesting
+ Map<String, Map<String, SortedSet<String>>> getWALs() {
+ return Collections.unmodifiableMap(walsById);
+ }
+
+ /**
+ * Get a copy of the wals of the recovered sources on this rs
+ * @return a sorted set of wal names
+ */
+ @VisibleForTesting
+ Map<String, Map<String, SortedSet<String>>> getWalsByIdRecoveredQueues() {
+ return Collections.unmodifiableMap(walsByIdRecoveredQueues);
+ }
+
+ /**
+ * Get a list of all the normal sources of this rs
+ * @return list of all normal sources
+ */
+ public List<ReplicationSourceInterface> getSources() {
+ return new ArrayList<>(this.sources.values());
+ }
+
+ /**
+ * Get a list of all the recovered sources of this rs
+ * @return list of all recovered sources
+ */
+ public List<ReplicationSourceInterface> getOldSources() {
+ return this.oldsources;
+ }
+
+ /**
+ * Get the normal source for a given peer
+ * @return the normal source for the give peer if it exists, otherwise null.
+ */
+ @VisibleForTesting
+ public ReplicationSourceInterface getSource(String peerId) {
+ return this.sources.get(peerId);
+ }
+
+ @VisibleForTesting
+ List<String> getAllQueues() throws IOException {
+ List<String> allQueues = Collections.emptyList();
+ try {
+ allQueues = queueStorage.getAllQueues(server.getServerName());
+ } catch (ReplicationException e) {
+ throw new IOException(e);
+ }
+ return allQueues;
+ }
+
+ @VisibleForTesting
+ int getSizeOfLatestPath() {
+ synchronized (latestPaths) {
+ return latestPaths.size();
+ }
+ }
+
+ @VisibleForTesting
+ public AtomicLong getTotalBufferUsed() {
+ return totalBufferUsed;
+ }
+
+ /**
* Get the directory where wals are archived
* @return the directory where wals are archived
*/
@@ -761,28 +838,30 @@ public class ReplicationSourceManager implements ReplicationListener {
* Get the ReplicationPeers used by this ReplicationSourceManager
* @return the ReplicationPeers used by this ReplicationSourceManager
*/
- public ReplicationPeers getReplicationPeers() {return this.replicationPeers;}
+ public ReplicationPeers getReplicationPeers() {
+ return this.replicationPeers;
+ }
/**
* Get a string representation of all the sources' metrics
*/
public String getStats() {
StringBuilder stats = new StringBuilder();
- for (ReplicationSourceInterface source : sources) {
+ for (ReplicationSourceInterface source : this.sources.values()) {
stats.append("Normal source for cluster " + source.getPeerId() + ": ");
stats.append(source.getStats() + "\n");
}
for (ReplicationSourceInterface oldSource : oldsources) {
- stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerId()+": ");
- stats.append(oldSource.getStats()+ "\n");
+ stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerId() + ": ");
+ stats.append(oldSource.getStats() + "\n");
}
return stats.toString();
}
public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
- throws ReplicationException {
- for (ReplicationSourceInterface source : this.sources) {
- source.addHFileRefs(tableName, family, pairs);
+ throws IOException {
+ for (ReplicationSourceInterface source : this.sources.values()) {
+ throwIOExceptionWhenFail(() -> source.addHFileRefs(tableName, family, pairs));
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/00336214/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
index ced2980..959f676 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -223,15 +223,15 @@ public class ReplicationSourceShipper extends Thread {
}
protected void updateLogPosition(long lastReadPosition) {
- source.getSourceManager().logPositionAndCleanOldLogs(currentPath, source.getPeerClusterZnode(),
- lastReadPosition, false, false);
+ source.getSourceManager().logPositionAndCleanOldLogs(currentPath, source.getQueueId(),
+ lastReadPosition, false);
lastLoggedPosition = lastReadPosition;
}
public void startup(UncaughtExceptionHandler handler) {
String name = Thread.currentThread().getName();
Threads.setDaemonThreadRunning(this, name + ".replicationSource." + walGroupId + ","
- + source.getPeerClusterZnode(), handler);
+ + source.getQueueId(), handler);
}
public PriorityBlockingQueue<Path> getLogQueue() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/00336214/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index b6b50ad..579d20f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -111,7 +111,7 @@ public class ReplicationSourceWALReader extends Thread {
this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
this.eofAutoRecovery = conf.getBoolean("replication.source.eof.autorecovery", false);
this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount);
- LOG.info("peerClusterZnode=" + source.getPeerClusterZnode()
+ LOG.info("peerClusterZnode=" + source.getQueueId()
+ ", ReplicationSourceWALReaderThread : " + source.getPeerId()
+ " inited, replicationBatchSizeCapacity=" + replicationBatchSizeCapacity
+ ", replicationBatchCountCapacity=" + replicationBatchCountCapacity
http://git-wip-us.apache.org/repos/asf/hbase/blob/00336214/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index 38ec598..ff20ddc 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -89,7 +89,7 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
}
@Override
- public String getPeerClusterZnode() {
+ public String getQueueId() {
return peerClusterId;
}