You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/05/02 02:59:00 UTC
[10/43] hbase git commit: HBASE-20475 Fix the flaky
TestReplicationDroppedTables unit test - addendum
HBASE-20475 Fix the flaky TestReplicationDroppedTables unit test - addendum
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e9a278ad
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e9a278ad
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e9a278ad
Branch: refs/heads/HBASE-19064
Commit: e9a278adc617a41ad3eefdd419e7618afee6b2b3
Parents: 39cf42b
Author: huzheng <op...@gmail.com>
Authored: Fri Apr 27 16:40:53 2018 +0800
Committer: huzheng <op...@gmail.com>
Committed: Fri Apr 27 21:38:15 2018 +0800
----------------------------------------------------------------------
.../hbase/replication/ReplicationPeers.java | 5 ++++
.../regionserver/ReplicationSourceManager.java | 28 +++++++++++++++-----
2 files changed, 27 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/e9a278ad/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
index eacb2f4..4d602ca 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.replication;
import java.util.Collections;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -103,6 +104,10 @@ public class ReplicationPeers {
return Collections.unmodifiableSet(peerCache.keySet());
}
+ public Map<String, ReplicationPeerImpl> getPeerCache() {
+ return Collections.unmodifiableMap(peerCache);
+ }
+
public PeerState refreshPeerState(String peerId) throws ReplicationException {
ReplicationPeerImpl peer = peerCache.get(peerId);
if (peer == null) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/e9a278ad/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 3ecc50a..70cd986 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationListener;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
+import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
@@ -627,11 +628,25 @@ public class ReplicationSourceManager implements ReplicationListener {
class NodeFailoverWorker extends Thread {
private final ServerName deadRS;
+ // After claim the queues from dead region server, the NodeFailoverWorker will skip to start
+ // the RecoveredReplicationSource if the peer has been removed. but there's possible that
+ // remove a peer with peerId = 2 and add a peer with peerId = 2 again during the
+ // NodeFailoverWorker. So we need a deep copied <peerId, peer> map to decide whether we
+ // should start the RecoveredReplicationSource. If the latest peer is not the old peer when
+ // NodeFailoverWorker begin, we should skip to start the RecoveredReplicationSource, Otherwise
+ // the rs will abort (See HBASE-20475).
+ private final Map<String, ReplicationPeerImpl> peersSnapshot;
@VisibleForTesting
public NodeFailoverWorker(ServerName deadRS) {
super("Failover-for-" + deadRS);
this.deadRS = deadRS;
+ peersSnapshot = new HashMap<>(replicationPeers.getPeerCache());
+ }
+
+ private boolean isOldPeer(String peerId, ReplicationPeerImpl newPeerRef) {
+ ReplicationPeerImpl oldPeerRef = peersSnapshot.get(peerId);
+ return oldPeerRef != null && oldPeerRef == newPeerRef;
}
@Override
@@ -691,16 +706,16 @@ public class ReplicationSourceManager implements ReplicationListener {
ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(queueId);
String actualPeerId = replicationQueueInfo.getPeerId();
- ReplicationPeer peer = replicationPeers.getPeer(actualPeerId);
- if (peer == null) {
- LOG.warn("Skipping failover for peer:" + actualPeerId + " of node " + deadRS +
- ", peer is null");
+ ReplicationPeerImpl peer = replicationPeers.getPeer(actualPeerId);
+ if (peer == null || !isOldPeer(actualPeerId, peer)) {
+ LOG.warn("Skipping failover for peer {} of node {}, peer is null", actualPeerId,
+ deadRS);
abortWhenFail(() -> queueStorage.removeQueue(server.getServerName(), queueId));
continue;
}
if (server instanceof ReplicationSyncUp.DummyServer
&& peer.getPeerState().equals(PeerState.DISABLED)) {
- LOG.warn("Peer {} is disbaled. ReplicationSyncUp tool will skip "
+ LOG.warn("Peer {} is disabled. ReplicationSyncUp tool will skip "
+ "replicating data to this peer.",
actualPeerId);
continue;
@@ -721,7 +736,8 @@ public class ReplicationSourceManager implements ReplicationListener {
ReplicationSourceInterface src = createSource(queueId, peer);
// synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
synchronized (oldsources) {
- if (!replicationPeers.getAllPeerIds().contains(src.getPeerId())) {
+ peer = replicationPeers.getPeer(src.getPeerId());
+ if (peer == null || !isOldPeer(src.getPeerId(), peer)) {
src.terminate("Recovered queue doesn't belong to any current peer");
removeRecoveredSource(src);
continue;