You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/05/02 02:59:00 UTC

[10/43] hbase git commit: HBASE-20475 Fix the flaky TestReplicationDroppedTables unit test - addendum

HBASE-20475 Fix the flaky TestReplicationDroppedTables unit test - addendum


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e9a278ad
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e9a278ad
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e9a278ad

Branch: refs/heads/HBASE-19064
Commit: e9a278adc617a41ad3eefdd419e7618afee6b2b3
Parents: 39cf42b
Author: huzheng <op...@gmail.com>
Authored: Fri Apr 27 16:40:53 2018 +0800
Committer: huzheng <op...@gmail.com>
Committed: Fri Apr 27 21:38:15 2018 +0800

----------------------------------------------------------------------
 .../hbase/replication/ReplicationPeers.java     |  5 ++++
 .../regionserver/ReplicationSourceManager.java  | 28 +++++++++++++++-----
 2 files changed, 27 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/e9a278ad/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
index eacb2f4..4d602ca 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hbase.replication;
 
 import java.util.Collections;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -103,6 +104,10 @@ public class ReplicationPeers {
     return Collections.unmodifiableSet(peerCache.keySet());
   }
 
+  public Map<String, ReplicationPeerImpl> getPeerCache() {
+    return Collections.unmodifiableMap(peerCache);
+  }
+
   public PeerState refreshPeerState(String peerId) throws ReplicationException {
     ReplicationPeerImpl peer = peerCache.get(peerId);
     if (peer == null) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/e9a278ad/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 3ecc50a..70cd986 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationListener;
 import org.apache.hadoop.hbase.replication.ReplicationPeer;
 import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
+import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
@@ -627,11 +628,25 @@ public class ReplicationSourceManager implements ReplicationListener {
   class NodeFailoverWorker extends Thread {
 
     private final ServerName deadRS;
+    // After claim the queues from dead region server, the NodeFailoverWorker will skip to start
+    // the RecoveredReplicationSource if the peer has been removed. but there's possible that
+    // remove a peer with peerId = 2 and add a peer with peerId = 2 again during the
+    // NodeFailoverWorker. So we need a deep copied <peerId, peer> map to decide whether we
+    // should start the RecoveredReplicationSource. If the latest peer is not the old peer when
+    // NodeFailoverWorker begin, we should skip to start the RecoveredReplicationSource, Otherwise
+    // the rs will abort (See HBASE-20475).
+    private final Map<String, ReplicationPeerImpl> peersSnapshot;
 
     @VisibleForTesting
     public NodeFailoverWorker(ServerName deadRS) {
       super("Failover-for-" + deadRS);
       this.deadRS = deadRS;
+      peersSnapshot = new HashMap<>(replicationPeers.getPeerCache());
+    }
+
+    private boolean isOldPeer(String peerId, ReplicationPeerImpl newPeerRef) {
+      ReplicationPeerImpl oldPeerRef = peersSnapshot.get(peerId);
+      return oldPeerRef != null && oldPeerRef == newPeerRef;
     }
 
     @Override
@@ -691,16 +706,16 @@ public class ReplicationSourceManager implements ReplicationListener {
           ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(queueId);
           String actualPeerId = replicationQueueInfo.getPeerId();
 
-          ReplicationPeer peer = replicationPeers.getPeer(actualPeerId);
-          if (peer == null) {
-            LOG.warn("Skipping failover for peer:" + actualPeerId + " of node " + deadRS +
-              ", peer is null");
+          ReplicationPeerImpl peer = replicationPeers.getPeer(actualPeerId);
+          if (peer == null || !isOldPeer(actualPeerId, peer)) {
+            LOG.warn("Skipping failover for peer {} of node {}, peer is null", actualPeerId,
+              deadRS);
             abortWhenFail(() -> queueStorage.removeQueue(server.getServerName(), queueId));
             continue;
           }
           if (server instanceof ReplicationSyncUp.DummyServer
               && peer.getPeerState().equals(PeerState.DISABLED)) {
-            LOG.warn("Peer {} is disbaled. ReplicationSyncUp tool will skip "
+            LOG.warn("Peer {} is disabled. ReplicationSyncUp tool will skip "
                 + "replicating data to this peer.",
               actualPeerId);
             continue;
@@ -721,7 +736,8 @@ public class ReplicationSourceManager implements ReplicationListener {
           ReplicationSourceInterface src = createSource(queueId, peer);
           // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
           synchronized (oldsources) {
-            if (!replicationPeers.getAllPeerIds().contains(src.getPeerId())) {
+            peer = replicationPeers.getPeer(src.getPeerId());
+            if (peer == null || !isOldPeer(src.getPeerId(), peer)) {
               src.terminate("Recovered queue doesn't belong to any current peer");
               removeRecoveredSource(src);
               continue;