You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/01/18 05:29:36 UTC
[40/50] [abbrv] hbase git commit: HBASE-19636 All rs should already
start work with the new peer change when replication peer procedure is
finished
HBASE-19636 All rs should already start work with the new peer change when replication peer procedure is finished
Signed-off-by: zhangduo <zh...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/9327dfda
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/9327dfda
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/9327dfda
Branch: refs/heads/HBASE-19397-branch-2
Commit: 9327dfda3576f72eef036aab211acf90920036bd
Parents: fbf15bc
Author: Guanghao Zhang <zg...@apache.org>
Authored: Thu Jan 4 16:58:01 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Thu Jan 18 13:27:54 2018 +0800
----------------------------------------------------------------------
.../replication/ReplicationPeerConfig.java | 1 -
.../hbase/replication/ReplicationPeerImpl.java | 4 +-
.../hbase/replication/ReplicationQueueInfo.java | 23 +-
.../hbase/replication/ReplicationUtils.java | 56 ++
.../replication/TestReplicationStateZKImpl.java | 22 -
.../regionserver/ReplicationSourceService.java | 3 +-
.../regionserver/PeerProcedureHandler.java | 3 +
.../regionserver/PeerProcedureHandlerImpl.java | 50 +-
.../RecoveredReplicationSource.java | 6 +-
.../RecoveredReplicationSourceShipper.java | 8 +-
.../replication/regionserver/Replication.java | 15 +-
.../regionserver/ReplicationSource.java | 34 +-
.../regionserver/ReplicationSourceFactory.java | 4 +-
.../ReplicationSourceInterface.java | 8 +-
.../regionserver/ReplicationSourceManager.java | 895 ++++++++++---------
.../regionserver/ReplicationSourceShipper.java | 6 +-
.../ReplicationSourceWALReader.java | 2 +-
.../replication/ReplicationSourceDummy.java | 2 +-
.../replication/TestNamespaceReplication.java | 57 +-
.../TestReplicationSourceManager.java | 11 +-
.../TestReplicationSourceManagerZkImpl.java | 1 -
21 files changed, 659 insertions(+), 552 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/9327dfda/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
index fdae288..bf8d030 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
@@ -25,7 +25,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
-
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
http://git-wip-us.apache.org/repos/asf/hbase/blob/9327dfda/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java
index 3e17025..604e0bb 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java
@@ -1,5 +1,4 @@
-/*
- *
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -28,6 +27,7 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public class ReplicationPeerImpl implements ReplicationPeer {
+
private final Configuration conf;
private final String id;
http://git-wip-us.apache.org/repos/asf/hbase/blob/9327dfda/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java
index ecd888f..cd65f9b 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java
@@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.ServerName;
/**
- * This class is responsible for the parsing logic for a znode representing a queue.
+ * This class is responsible for the parsing logic for a queue id representing a queue.
* It will extract the peerId if it's recovered as well as the dead region servers
* that were part of the queue's history.
*/
@@ -38,21 +38,20 @@ public class ReplicationQueueInfo {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationQueueInfo.class);
private final String peerId;
- private final String peerClusterZnode;
+ private final String queueId;
private boolean queueRecovered;
// List of all the dead region servers that had this queue (if recovered)
private List<ServerName> deadRegionServers = new ArrayList<>();
/**
- * The passed znode will be either the id of the peer cluster or
- * the handling story of that queue in the form of id-servername-*
+ * The passed queueId will be either the id of the peer or the handling story of that queue
+ * in the form of id-servername-*
*/
- public ReplicationQueueInfo(String znode) {
- this.peerClusterZnode = znode;
- String[] parts = znode.split("-", 2);
+ public ReplicationQueueInfo(String queueId) {
+ this.queueId = queueId;
+ String[] parts = queueId.split("-", 2);
this.queueRecovered = parts.length != 1;
- this.peerId = this.queueRecovered ?
- parts[0] : peerClusterZnode;
+ this.peerId = this.queueRecovered ? parts[0] : queueId;
if (parts.length >= 2) {
// extract dead servers
extractDeadServersFromZNodeString(parts[1], this.deadRegionServers);
@@ -60,7 +59,7 @@ public class ReplicationQueueInfo {
}
/**
- * Parse dead server names from znode string servername can contain "-" such as
+ * Parse dead server names from queue id. servername can contain "-" such as
* "ip-10-46-221-101.ec2.internal", so we need skip some "-" during parsing for the following
* cases: 2-ip-10-46-221-101.ec2.internal,52170,1364333181125-<server name>-...
*/
@@ -119,8 +118,8 @@ public class ReplicationQueueInfo {
return this.peerId;
}
- public String getPeerClusterZnode() {
- return this.peerClusterZnode;
+ public String getQueueId() {
+ return this.queueId;
}
public boolean isQueueRecovered() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/9327dfda/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
index 7b676ca..ebe68a7 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
@@ -18,11 +18,15 @@
package org.apache.hadoop.hbase.replication;
import java.io.IOException;
+import java.util.Collection;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompoundConfiguration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
/**
@@ -66,4 +70,56 @@ public final class ReplicationUtils {
queueStorage.removeReplicatorIfQueueIsEmpty(replicator);
}
}
+
+ private static boolean isCollectionEqual(Collection<String> c1, Collection<String> c2) {
+ if (c1 == null) {
+ return c2 == null;
+ }
+ if (c2 == null) {
+ return false;
+ }
+ return c1.size() == c2.size() && c1.containsAll(c2);
+ }
+
+ private static boolean isNamespacesEqual(Set<String> ns1, Set<String> ns2) {
+ return isCollectionEqual(ns1, ns2);
+ }
+
+ private static boolean isTableCFsEqual(Map<TableName, List<String>> tableCFs1,
+ Map<TableName, List<String>> tableCFs2) {
+ if (tableCFs1 == null) {
+ return tableCFs2 == null;
+ }
+ if (tableCFs2 == null) {
+ return false;
+ }
+ if (tableCFs1.size() != tableCFs2.size()) {
+ return false;
+ }
+ for (Map.Entry<TableName, List<String>> entry1 : tableCFs1.entrySet()) {
+ TableName table = entry1.getKey();
+ if (!tableCFs2.containsKey(table)) {
+ return false;
+ }
+ List<String> cfs1 = entry1.getValue();
+ List<String> cfs2 = tableCFs2.get(table);
+ if (!isCollectionEqual(cfs1, cfs2)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public static boolean isKeyConfigEqual(ReplicationPeerConfig rpc1, ReplicationPeerConfig rpc2) {
+ if (rpc1.replicateAllUserTables() != rpc2.replicateAllUserTables()) {
+ return false;
+ }
+ if (rpc1.replicateAllUserTables()) {
+ return isNamespacesEqual(rpc1.getExcludeNamespaces(), rpc2.getExcludeNamespaces()) &&
+ isTableCFsEqual(rpc1.getExcludeTableCFsMap(), rpc2.getExcludeTableCFsMap());
+ } else {
+ return isNamespacesEqual(rpc1.getNamespaces(), rpc2.getNamespaces()) &&
+ isTableCFsEqual(rpc1.getTableCFsMap(), rpc2.getTableCFsMap());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/9327dfda/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
index 6825c36..2790bd0 100644
--- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
+++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
@@ -18,9 +18,7 @@
package org.apache.hadoop.hbase.replication;
import java.io.IOException;
-
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.ClusterId;
import org.apache.hadoop.hbase.HBaseZKTestingUtility;
import org.apache.hadoop.hbase.HConstants;
@@ -37,14 +35,10 @@ import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.experimental.categories.Category;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
@Category({ ReplicationTests.class, MediumTests.class })
public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
- private static final Logger LOG = LoggerFactory.getLogger(TestReplicationStateZKImpl.class);
-
private static Configuration conf;
private static HBaseZKTestingUtility utility;
private static ZKWatcher zkw;
@@ -92,20 +86,4 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
public static void tearDownAfterClass() throws Exception {
utility.shutdownMiniZKCluster();
}
-
- private static class WarnOnlyAbortable implements Abortable {
-
- @Override
- public void abort(String why, Throwable e) {
- LOG.warn("TestReplicationStateZKImpl received abort, ignoring. Reason: " + why);
- if (LOG.isDebugEnabled()) {
- LOG.debug(e.toString(), e);
- }
- }
-
- @Override
- public boolean isAborted() {
- return false;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/9327dfda/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java
index a82fa3d..2aef0a8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java
@@ -1,5 +1,4 @@
-/*
- *
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
http://git-wip-us.apache.org/repos/asf/hbase/blob/9327dfda/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java
index b392985..65da9af 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java
@@ -23,6 +23,9 @@ import java.io.IOException;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.yetus.audience.InterfaceAudience;
+/**
+ * A handler for modifying replication peer in peer procedures.
+ */
@InterfaceAudience.Private
public interface PeerProcedureHandler {
http://git-wip-us.apache.org/repos/asf/hbase/blob/9327dfda/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
index c09c6a0..ce8fdae 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
@@ -15,21 +15,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import java.util.concurrent.locks.Lock;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
+import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.util.KeyLocker;
import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
@InterfaceAudience.Private
public class PeerProcedureHandlerImpl implements PeerProcedureHandler {
- private static final Logger LOG = LoggerFactory.getLogger(PeerProcedureHandlerImpl.class);
private final ReplicationSourceManager replicationSourceManager;
private final KeyLocker<String> peersLock = new KeyLocker<>();
@@ -39,7 +38,7 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler {
}
@Override
- public void addPeer(String peerId) throws ReplicationException, IOException {
+ public void addPeer(String peerId) throws IOException {
Lock peerLock = peersLock.acquireLock(peerId);
try {
replicationSourceManager.addPeer(peerId);
@@ -49,7 +48,7 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler {
}
@Override
- public void removePeer(String peerId) throws ReplicationException, IOException {
+ public void removePeer(String peerId) throws IOException {
Lock peerLock = peersLock.acquireLock(peerId);
try {
if (replicationSourceManager.getReplicationPeers().getPeer(peerId) != null) {
@@ -60,35 +59,50 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler {
}
}
- @Override
- public void disablePeer(String peerId) throws ReplicationException, IOException {
+ private void refreshPeerState(String peerId) throws ReplicationException, IOException {
PeerState newState;
Lock peerLock = peersLock.acquireLock(peerId);
try {
+ ReplicationPeerImpl peer = replicationSourceManager.getReplicationPeers().getPeer(peerId);
+ if (peer == null) {
+ throw new ReplicationException("Peer with id=" + peerId + " is not cached.");
+ }
+ PeerState oldState = peer.getPeerState();
newState = replicationSourceManager.getReplicationPeers().refreshPeerState(peerId);
+ // RS need to start work with the new replication state change
+ if (oldState.equals(PeerState.ENABLED) && newState.equals(PeerState.DISABLED)) {
+ replicationSourceManager.refreshSources(peerId);
+ }
} finally {
peerLock.unlock();
}
- LOG.info("disable replication peer, id: {}, new state: {}", peerId, newState);
}
@Override
public void enablePeer(String peerId) throws ReplicationException, IOException {
- PeerState newState;
- Lock peerLock = peersLock.acquireLock(peerId);
- try {
- newState = replicationSourceManager.getReplicationPeers().refreshPeerState(peerId);
- } finally {
- peerLock.unlock();
- }
- LOG.info("enable replication peer, id: {}, new state: {}", peerId, newState);
+ refreshPeerState(peerId);
+ }
+
+ @Override
+ public void disablePeer(String peerId) throws ReplicationException, IOException {
+ refreshPeerState(peerId);
}
@Override
public void updatePeerConfig(String peerId) throws ReplicationException, IOException {
Lock peerLock = peersLock.acquireLock(peerId);
try {
- replicationSourceManager.getReplicationPeers().refreshPeerConfig(peerId);
+ ReplicationPeerImpl peer = replicationSourceManager.getReplicationPeers().getPeer(peerId);
+ if (peer == null) {
+ throw new ReplicationException("Peer with id=" + peerId + " is not cached.");
+ }
+ ReplicationPeerConfig oldConfig = peer.getPeerConfig();
+ ReplicationPeerConfig newConfig =
+ replicationSourceManager.getReplicationPeers().refreshPeerConfig(peerId);
+ // RS need to start work with the new replication config change
+ if (!ReplicationUtils.isKeyConfigEqual(oldConfig, newConfig)) {
+ replicationSourceManager.refreshSources(peerId);
+ }
} finally {
peerLock.unlock();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/9327dfda/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
index 7bceb78..1be9a88 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
@@ -81,7 +81,7 @@ public class RecoveredReplicationSource extends ReplicationSource {
ReplicationSourceWALReader walReader = new RecoveredReplicationSourceWALReader(fs,
conf, queue, startPosition, walEntryFilter, this);
Threads.setDaemonThreadRunning(walReader, threadName
- + ".replicationSource.replicationWALReaderThread." + walGroupId + "," + peerClusterZnode,
+ + ".replicationSource.replicationWALReaderThread." + walGroupId + "," + queueId,
getUncaughtExceptionHandler());
return walReader;
}
@@ -178,8 +178,8 @@ public class RecoveredReplicationSource extends ReplicationSource {
}
}
if (allTasksDone) {
- manager.closeRecoveredQueue(this);
- LOG.info("Finished recovering queue " + peerClusterZnode + " with the following stats: "
+ manager.removeRecoveredSource(this);
+ LOG.info("Finished recovering queue " + queueId + " with the following stats: "
+ getStats());
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/9327dfda/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
index fb365bc..1e45496 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
@@ -77,7 +77,7 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper
if (entryBatch.getWalEntries().isEmpty()
&& entryBatch.getLastSeqIds().isEmpty()) {
LOG.debug("Finished recovering queue for group " + walGroupId + " of peer "
- + source.getPeerClusterZnode());
+ + source.getQueueId());
source.getSourceMetrics().incrCompletedRecoveryQueue();
setWorkerState(WorkerState.FINISHED);
continue;
@@ -114,7 +114,7 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper
// normally has a position (unless the RS failed between 2 logs)
private long getRecoveredQueueStartPos() {
long startPosition = 0;
- String peerClusterZnode = source.getPeerClusterZnode();
+ String peerClusterZnode = source.getQueueId();
try {
startPosition = this.replicationQueues.getWALPosition(source.getServerWALsBelongTo(),
peerClusterZnode, this.queue.peek().getName());
@@ -130,8 +130,8 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper
@Override
protected void updateLogPosition(long lastReadPosition) {
- source.getSourceManager().logPositionAndCleanOldLogs(currentPath, source.getPeerClusterZnode(),
- lastReadPosition, true, false);
+ source.getSourceManager().logPositionAndCleanOldLogs(currentPath, source.getQueueId(),
+ lastReadPosition, true);
lastLoggedPosition = lastReadPosition;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/9327dfda/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index dca2439..d1a3266 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -41,7 +41,6 @@ import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
-import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
@@ -59,10 +58,10 @@ import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
-
-import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* Gateway to Replication. Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}.
*/
@@ -218,11 +217,7 @@ public class Replication implements
* @throws IOException
*/
public void startReplicationService() throws IOException {
- try {
- this.replicationManager.init();
- } catch (ReplicationException e) {
- throw new IOException(e);
- }
+ this.replicationManager.init();
this.replicationSink = new ReplicationSink(this.conf, this.server);
this.scheduleThreadPool.scheduleAtFixedRate(
new ReplicationStatisticsThread(this.replicationSink, this.replicationManager),
@@ -280,9 +275,9 @@ public class Replication implements
throws IOException {
try {
this.replicationManager.addHFileRefs(tableName, family, pairs);
- } catch (ReplicationException e) {
+ } catch (IOException e) {
LOG.error("Failed to add hfile references in the replication queue.", e);
- throw new IOException(e);
+ throw e;
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/9327dfda/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 0b44ba4..6b622ee 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -106,7 +106,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
// total number of edits we replicated
private AtomicLong totalReplicatedEdits = new AtomicLong(0);
// The znode we currently play with
- protected String peerClusterZnode;
+ protected String queueId;
// Maximum number of retries before taking bold actions
private int maxRetriesMultiplier;
// Indicates if this particular source is running
@@ -142,14 +142,14 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
* @param fs file system to use
* @param manager replication manager to ping to
* @param server the server for this region server
- * @param peerClusterZnode the name of our znode
+ * @param queueId the id of our replication queue
* @param clusterId unique UUID for the cluster
* @param metrics metrics for replication source
*/
@Override
public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
- String peerClusterZnode, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
+ String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
MetricsSource metrics) throws IOException {
this.server = server;
this.conf = HBaseConfiguration.create(conf);
@@ -168,8 +168,8 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
this.metrics = metrics;
this.clusterId = clusterId;
- this.peerClusterZnode = peerClusterZnode;
- this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
+ this.queueId = queueId;
+ this.replicationQueueInfo = new ReplicationQueueInfo(queueId);
// ReplicationQueueInfo parses the peerId out of the znode for us
this.peerId = this.replicationQueueInfo.getPeerId();
this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
@@ -179,7 +179,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0);
this.totalBufferUsed = manager.getTotalBufferUsed();
this.walFileLengthProvider = walFileLengthProvider;
- LOG.info("peerClusterZnode=" + peerClusterZnode + ", ReplicationSource : " + peerId
+ LOG.info("queueId=" + queueId + ", ReplicationSource : " + peerId
+ ", currentBandwidth=" + this.currentBandwidth);
}
@@ -217,12 +217,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
@Override
public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
throws ReplicationException {
- String peerId = peerClusterZnode;
- if (peerId.contains("-")) {
- // peerClusterZnode will be in the form peerId + "-" + rsZNode.
- // A peerId will not have "-" in its name, see HBASE-11394
- peerId = peerClusterZnode.split("-")[0];
- }
Map<TableName, List<String>> tableCFMap = replicationPeer.getTableCFs();
if (tableCFMap != null) {
List<String> tableCfs = tableCFMap.get(tableName);
@@ -311,7 +305,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId "
+ peerClusterId + " which is not allowed by ReplicationEndpoint:"
+ replicationEndpoint.getClass().getName(), null, false);
- this.manager.closeQueue(this);
+ this.manager.removeSource(this);
return;
}
LOG.info("Replicating " + clusterId + " -> " + peerClusterId);
@@ -356,7 +350,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
ReplicationSourceWALReader walReader =
new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this);
return (ReplicationSourceWALReader) Threads.setDaemonThreadRunning(walReader,
- threadName + ".replicationSource.wal-reader." + walGroupId + "," + peerClusterZnode,
+ threadName + ".replicationSource.wal-reader." + walGroupId + "," + queueId,
getUncaughtExceptionHandler());
}
@@ -450,7 +444,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
LOG.error("Unexpected exception in ReplicationSource", e);
}
};
- Threads.setDaemonThreadRunning(this, n + ".replicationSource," + this.peerClusterZnode,
+ Threads.setDaemonThreadRunning(this, n + ".replicationSource," + this.queueId,
handler);
}
@@ -466,9 +460,9 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
public void terminate(String reason, Exception cause, boolean join) {
if (cause == null) {
- LOG.info("Closing source " + this.peerClusterZnode + " because: " + reason);
+ LOG.info("Closing source " + this.queueId + " because: " + reason);
} else {
- LOG.error("Closing source " + this.peerClusterZnode + " because an error occurred: " + reason,
+ LOG.error("Closing source " + this.queueId + " because an error occurred: " + reason,
cause);
}
this.sourceRunning = false;
@@ -491,7 +485,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
this.replicationEndpoint.awaitTerminated(sleepForRetries * maxRetriesMultiplier, TimeUnit.MILLISECONDS);
} catch (TimeoutException te) {
LOG.warn("Got exception while waiting for endpoint to shutdown for replication source :"
- + this.peerClusterZnode,
+ + this.queueId,
te);
}
}
@@ -499,8 +493,8 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
}
@Override
- public String getPeerClusterZnode() {
- return this.peerClusterZnode;
+ public String getQueueId() {
+ return this.queueId;
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/9327dfda/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
index 865a202..93e8331 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
@@ -32,8 +32,8 @@ public class ReplicationSourceFactory {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceFactory.class);
- static ReplicationSourceInterface create(Configuration conf, String peerId) {
- ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
+ static ReplicationSourceInterface create(Configuration conf, String queueId) {
+ ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(queueId);
boolean isQueueRecovered = replicationQueueInfo.isQueueRecovered();
ReplicationSourceInterface src;
try {
http://git-wip-us.apache.org/repos/asf/hbase/blob/9327dfda/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index 4f10c73..d7cf9a3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -51,7 +51,7 @@ public interface ReplicationSourceInterface {
*/
void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
- String peerClusterZnode, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
+ String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
MetricsSource metrics) throws IOException;
/**
@@ -96,11 +96,11 @@ public interface ReplicationSourceInterface {
Path getCurrentPath();
/**
- * Get the id that the source is replicating to
+ * Get the queue id that the source is replicating to
*
- * @return peer cluster id
+ * @return queue id
*/
- String getPeerClusterZnode();
+ String getQueueId();
/**
* Get the id that the source is replicating to.