You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2015/08/08 00:14:51 UTC

[1/6] hbase git commit: HBASE-12865 WALs may be deleted before they are replicated to peers (He Liangliang)

Repository: hbase
Updated Branches:
  refs/heads/0.98 725231c39 -> 3bf9957d9
  refs/heads/branch-1 832a61513 -> 68cb53d15
  refs/heads/branch-1.0 5d802f35c -> bc4498a26
  refs/heads/branch-1.1 007fa52fc -> e00caff3b
  refs/heads/branch-1.2 bb1407d94 -> 7abb12be2
  refs/heads/master f1c1692d5 -> ea8833fd6


HBASE-12865 WALs may be deleted before they are replicated to peers (He Liangliang)

Conflicts:
	hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
	hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
	hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/3bf9957d
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/3bf9957d
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/3bf9957d

Branch: refs/heads/0.98
Commit: 3bf9957d9dfa59c87b6f920fdad256a59030a845
Parents: 725231c
Author: Andrew Purtell <ap...@apache.org>
Authored: Fri Aug 7 11:56:31 2015 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Fri Aug 7 14:40:13 2015 -0700

----------------------------------------------------------------------
 .../replication/ReplicationQueuesClient.java    | 19 ++++--
 .../ReplicationQueuesClientZKImpl.java          | 17 +++++-
 .../replication/ReplicationQueuesZKImpl.java    |  4 +-
 .../master/ReplicationLogCleaner.java           | 61 +++++++++++++-------
 .../replication/TestReplicationStateBasic.java  |  3 +-
 .../TestReplicationSourceManager.java           | 37 +++++++++++-
 6 files changed, 110 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/3bf9957d/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
index 5c068be..ed0e561 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
@@ -21,7 +21,7 @@ package org.apache.hadoop.hbase.replication;
 import java.util.List;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-
+import org.apache.zookeeper.KeeperException;
 
 /**
  * This provides an interface for clients of replication to view replication queues. These queues
@@ -39,21 +39,30 @@ public interface ReplicationQueuesClient {
    * Get a list of all region servers that have outstanding replication queues. These servers could
    * be alive, dead or from a previous run of the cluster.
    * @return a list of server names
+   * @throws KeeperException zookeeper exception
    */
-  List<String> getListOfReplicators();
+  List<String> getListOfReplicators() throws KeeperException;
 
   /**
    * Get a list of all HLogs in the given queue on the given region server.
    * @param serverName the server name of the region server that owns the queue
    * @param queueId a String that identifies the queue
-   * @return a list of HLogs, null if this region server is dead and has no outstanding queues
+   * @return a list of WALs, null if this region server is dead and has no outstanding queues
+   * @throws KeeperException zookeeper exception
    */
-  List<String> getLogsInQueue(String serverName, String queueId);
+  List<String> getLogsInQueue(String serverName, String queueId) throws KeeperException;
 
   /**
    * Get a list of all queues for the specified region server.
    * @param serverName the server name of the region server that owns the set of queues
    * @return a list of queueIds, null if this region server is not a replicator.
    */
-  List<String> getAllQueues(String serverName);
+  List<String> getAllQueues(String serverName) throws KeeperException;
+
+  /**
+   * Get the cversion of replication rs node. This can be used as optimistic locking to get a
+   * consistent snapshot of the replication queues.
+   * @return cversion of replication rs node
+   */
+  int getQueuesZNodeCversion() throws KeeperException;
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/3bf9957d/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
index 3bc4f48..581327b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
 
 @InterfaceAudience.Private
 public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implements
@@ -46,7 +47,7 @@ public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implem
   }
 
   @Override
-  public List<String> getLogsInQueue(String serverName, String queueId) {
+  public List<String> getLogsInQueue(String serverName, String queueId) throws KeeperException {
     String znode = ZKUtil.joinZNode(this.queuesZNode, serverName);
     znode = ZKUtil.joinZNode(znode, queueId);
     List<String> result = null;
@@ -55,20 +56,32 @@ public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implem
     } catch (KeeperException e) {
       this.abortable.abort("Failed to get list of hlogs for queueId=" + queueId
           + " and serverName=" + serverName, e);
+      throw e;
     }
     return result;
   }
 
   @Override
-  public List<String> getAllQueues(String serverName) {
+  public List<String> getAllQueues(String serverName) throws KeeperException {
     String znode = ZKUtil.joinZNode(this.queuesZNode, serverName);
     List<String> result = null;
     try {
       result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
     } catch (KeeperException e) {
       this.abortable.abort("Failed to get list of queues for serverName=" + serverName, e);
+      throw e;
     }
     return result;
   }
 
+  @Override public int getQueuesZNodeCversion() throws KeeperException {
+    try {
+      Stat stat = new Stat();
+      ZKUtil.getDataNoWatch(this.zookeeper, this.queuesZNode, stat);
+      return stat.getCversion();
+    } catch (KeeperException e) {
+      this.abortable.abort("Failed to get stat of replication rs node", e);
+      throw e;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/3bf9957d/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
index 4ee75bf..949efd5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
@@ -331,7 +331,9 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
         // add delete op for peer
         listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
       }
-      // add delete op for dead rs
+      // add delete op for dead rs, this will update the cversion of the parent.
+      // The reader will make optimistic locking with this to get a consistent
+      // snapshot
       listOfOps.add(ZKUtilOp.deleteNodeFailSilent(deadRSZnodePath));
       LOG.debug(" The multi list size is: " + listOfOps.size());
       ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);

http://git-wip-us.apache.org/repos/asf/hbase/blob/3bf9957d/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
index 8f099d7..32b5536 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 
@@ -39,6 +40,7 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
+import org.apache.zookeeper.KeeperException;
 
 /**
  * Implementation of a log cleaner that checks if a log is still scheduled for
@@ -61,12 +63,20 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo
       return files;
     }
 
-    final Set<String> hlogs = loadHLogsFromQueues();
+    final Set<String> wals;
+    try {
+      // The concurrently created new WALs may not be included in the return list,
+      // but they won't be deleted because they're not in the checking set.
+      wals = loadWALsFromQueues();
+    } catch (KeeperException e) {
+      LOG.warn("Failed to read zookeeper, skipping checking deletable files");
+      return Collections.emptyList();
+    }
     return Iterables.filter(files, new Predicate<FileStatus>() {
       @Override
       public boolean apply(FileStatus file) {
         String hlog = file.getPath().getName();
-        boolean logInReplicationQueue = hlogs.contains(hlog);
+        boolean logInReplicationQueue = wals.contains(hlog);
         if (LOG.isDebugEnabled()) {
           if (logInReplicationQueue) {
             LOG.debug("Found log in ZK, keeping: " + hlog);
@@ -79,29 +89,40 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo
   }
 
   /**
-   * Load all hlogs in all replication queues from ZK
+   * Load all wals in all replication queues from ZK. This method guarantees to return a
+   * snapshot which contains all WALs in the zookeeper at the start of this call even there
+   * is concurrent queue failover. However, some newly created WALs during the call may
+   * not be included.
    */
-  private Set<String> loadHLogsFromQueues() {
-    List<String> rss = replicationQueues.getListOfReplicators();
-    if (rss == null) {
-      LOG.debug("Didn't find any region server that replicates, won't prevent any deletions.");
-      return ImmutableSet.of();
-    }
-    Set<String> hlogs = Sets.newHashSet();
-    for (String rs: rss) {
-      List<String> listOfPeers = replicationQueues.getAllQueues(rs);
-      // if rs just died, this will be null
-      if (listOfPeers == null) {
-        continue;
+  private Set<String> loadWALsFromQueues() throws KeeperException {
+    int v0 = replicationQueues.getQueuesZNodeCversion();
+    for (int retry = 0; ; retry++) {
+      List<String> rss = replicationQueues.getListOfReplicators();
+      if (rss == null) {
+        LOG.debug("Didn't find any region server that replicates, won't prevent any deletions.");
+        return ImmutableSet.of();
       }
-      for (String id : listOfPeers) {
-        List<String> peersHlogs = replicationQueues.getLogsInQueue(rs, id);
-        if (peersHlogs != null) {
-          hlogs.addAll(peersHlogs);
+      Set<String> wals = Sets.newHashSet();
+      for (String rs : rss) {
+        List<String> listOfPeers = replicationQueues.getAllQueues(rs);
+        // if rs just died, this will be null
+        if (listOfPeers == null) {
+          continue;
         }
+        for (String id : listOfPeers) {
+          List<String> peersWals = replicationQueues.getLogsInQueue(rs, id);
+          if (peersWals != null) {
+            wals.addAll(peersWals);
+          }
+        }
+      }
+      int v1 = replicationQueues.getQueuesZNodeCversion();
+      if (v0 == v1) {
+        return wals;
       }
+      LOG.info(String.format("Replication queue node cversion changed from %d to %d, retry = %d",
+          v0, v1, retry));
     }
-    return hlogs;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/3bf9957d/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
index e560620..f05eceb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
@@ -28,6 +28,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.zookeeper.KeeperException;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -66,7 +67,7 @@ public abstract class TestReplicationStateBasic {
   }
 
   @Test
-  public void testReplicationQueuesClient() throws ReplicationException {
+  public void testReplicationQueuesClient() throws ReplicationException, KeeperException {
     rqc.init();
     // Test methods with empty state
     assertEquals(0, rqc.getListOfReplicators().size());

http://git-wip-us.apache.org/repos/asf/hbase/blob/3bf9957d/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index d21c3cb..2d86cd8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
 import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
 import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker;
@@ -358,8 +359,40 @@ public class TestReplicationSourceManager {
 
     server.abort("", null);
   }
-  
-  
+
+  @Test
+  public void testFailoverDeadServerCversionChange() throws Exception {
+    LOG.debug("testFailoverDeadServerCversionChange");
+
+    conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
+    final Server s0 = new DummyServer("cversion-change0.example.org");
+    ReplicationQueues repQueues =
+        ReplicationFactory.getReplicationQueues(s0.getZooKeeper(), conf, s0);
+    repQueues.init(s0.getServerName().toString());
+    // populate some znodes in the peer znode
+    files.add("log1");
+    files.add("log2");
+    for (String file : files) {
+      repQueues.addLog("1", file);
+    }
+    // simulate queue transfer
+    Server s1 = new DummyServer("cversion-change1.example.org");
+    ReplicationQueues rq1 =
+        ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1);
+    rq1.init(s1.getServerName().toString());
+
+    ReplicationQueuesClient client =
+        ReplicationFactory.getReplicationQueuesClient(s1.getZooKeeper(), s1.getConfiguration(), s1);
+
+    int v0 = client.getQueuesZNodeCversion();
+    rq1.claimQueues(s0.getServerName().getServerName());
+    int v1 = client.getQueuesZNodeCversion();
+    // cversion should increased by 1 since a child node is deleted
+    assertEquals(v0 + 1, v1);
+
+    s0.abort("", null);
+  }
+
   static class DummyNodeFailoverWorker extends Thread {
     private SortedMap<String, SortedSet<String>> logZnodesMap;
     Server server;


[3/6] hbase git commit: HBASE-12865 WALs may be deleted before they are replicated to peers (He Liangliang)

Posted by ap...@apache.org.
HBASE-12865 WALs may be deleted before they are replicated to peers (He Liangliang)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e00caff3
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e00caff3
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e00caff3

Branch: refs/heads/branch-1.1
Commit: e00caff3b301d178dfa9b3fbe8bfafd74dc1250e
Parents: 007fa52
Author: Andrew Purtell <ap...@apache.org>
Authored: Fri Aug 7 15:08:27 2015 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Fri Aug 7 15:08:27 2015 -0700

----------------------------------------------------------------------
 .../replication/ReplicationQueuesClient.java    | 17 ++++--
 .../ReplicationQueuesClientZKImpl.java          | 17 +++++-
 .../replication/ReplicationQueuesZKImpl.java    |  4 +-
 .../master/ReplicationLogCleaner.java           | 59 +++++++++++++-------
 .../replication/TestReplicationStateBasic.java  |  3 +-
 .../TestReplicationSourceManager.java           | 33 +++++++++++
 6 files changed, 106 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/e00caff3/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
index fed1791..5b3e541 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
@@ -21,7 +21,7 @@ package org.apache.hadoop.hbase.replication;
 import java.util.List;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-
+import org.apache.zookeeper.KeeperException;
 
 /**
  * This provides an interface for clients of replication to view replication queues. These queues
@@ -39,21 +39,30 @@ public interface ReplicationQueuesClient {
    * Get a list of all region servers that have outstanding replication queues. These servers could
    * be alive, dead or from a previous run of the cluster.
    * @return a list of server names
+   * @throws KeeperException zookeeper exception
    */
-  List<String> getListOfReplicators();
+  List<String> getListOfReplicators() throws KeeperException;
 
   /**
    * Get a list of all WALs in the given queue on the given region server.
    * @param serverName the server name of the region server that owns the queue
    * @param queueId a String that identifies the queue
    * @return a list of WALs, null if this region server is dead and has no outstanding queues
+   * @throws KeeperException zookeeper exception
    */
-  List<String> getLogsInQueue(String serverName, String queueId);
+  List<String> getLogsInQueue(String serverName, String queueId) throws KeeperException;
 
   /**
    * Get a list of all queues for the specified region server.
    * @param serverName the server name of the region server that owns the set of queues
    * @return a list of queueIds, null if this region server is not a replicator.
    */
-  List<String> getAllQueues(String serverName);
+  List<String> getAllQueues(String serverName) throws KeeperException;
+
+  /**
+   * Get the cversion of replication rs node. This can be used as optimistic locking to get a
+   * consistent snapshot of the replication queues.
+   * @return cversion of replication rs node
+   */
+  int getQueuesZNodeCversion() throws KeeperException;
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e00caff3/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
index 43262a0..e1a6a49 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
 
 @InterfaceAudience.Private
 public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implements
@@ -46,7 +47,7 @@ public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implem
   }
 
   @Override
-  public List<String> getLogsInQueue(String serverName, String queueId) {
+  public List<String> getLogsInQueue(String serverName, String queueId) throws KeeperException {
     String znode = ZKUtil.joinZNode(this.queuesZNode, serverName);
     znode = ZKUtil.joinZNode(znode, queueId);
     List<String> result = null;
@@ -55,20 +56,32 @@ public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implem
     } catch (KeeperException e) {
       this.abortable.abort("Failed to get list of wals for queueId=" + queueId
           + " and serverName=" + serverName, e);
+      throw e;
     }
     return result;
   }
 
   @Override
-  public List<String> getAllQueues(String serverName) {
+  public List<String> getAllQueues(String serverName) throws KeeperException {
     String znode = ZKUtil.joinZNode(this.queuesZNode, serverName);
     List<String> result = null;
     try {
       result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
     } catch (KeeperException e) {
       this.abortable.abort("Failed to get list of queues for serverName=" + serverName, e);
+      throw e;
     }
     return result;
   }
 
+  @Override public int getQueuesZNodeCversion() throws KeeperException {
+    try {
+      Stat stat = new Stat();
+      ZKUtil.getDataNoWatch(this.zookeeper, this.queuesZNode, stat);
+      return stat.getCversion();
+    } catch (KeeperException e) {
+      this.abortable.abort("Failed to get stat of replication rs node", e);
+      throw e;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e00caff3/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
index 6a30511..b1d434d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
@@ -334,7 +334,9 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
         // add delete op for peer
         listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
       }
-      // add delete op for dead rs
+      // add delete op for dead rs, this will update the cversion of the parent.
+      // The reader will make optimistic locking with this to get a consistent
+      // snapshot
       listOfOps.add(ZKUtilOp.deleteNodeFailSilent(deadRSZnodePath));
       LOG.debug(" The multi list size is: " + listOfOps.size());
       ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);

http://git-wip-us.apache.org/repos/asf/hbase/blob/e00caff3/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
index 525b7ad..474f497 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 
@@ -39,6 +40,7 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
+import org.apache.zookeeper.KeeperException;
 
 /**
  * Implementation of a log cleaner that checks if a log is still scheduled for
@@ -61,7 +63,15 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo
       return files;
     }
 
-    final Set<String> wals = loadWALsFromQueues();
+    final Set<String> wals;
+    try {
+      // The concurrently created new WALs may not be included in the return list,
+      // but they won't be deleted because they're not in the checking set.
+      wals = loadWALsFromQueues();
+    } catch (KeeperException e) {
+      LOG.warn("Failed to read zookeeper, skipping checking deletable files");
+      return Collections.emptyList();
+    }
     return Iterables.filter(files, new Predicate<FileStatus>() {
       @Override
       public boolean apply(FileStatus file) {
@@ -79,29 +89,40 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo
   }
 
   /**
-   * Load all wals in all replication queues from ZK
+   * Load all wals in all replication queues from ZK. This method guarantees to return a
+   * snapshot which contains all WALs in the zookeeper at the start of this call even there
+   * is concurrent queue failover. However, some newly created WALs during the call may
+   * not be included.
    */
-  private Set<String> loadWALsFromQueues() {
-    List<String> rss = replicationQueues.getListOfReplicators();
-    if (rss == null) {
-      LOG.debug("Didn't find any region server that replicates, won't prevent any deletions.");
-      return ImmutableSet.of();
-    }
-    Set<String> wals = Sets.newHashSet();
-    for (String rs: rss) {
-      List<String> listOfPeers = replicationQueues.getAllQueues(rs);
-      // if rs just died, this will be null
-      if (listOfPeers == null) {
-        continue;
+  private Set<String> loadWALsFromQueues() throws KeeperException {
+    int v0 = replicationQueues.getQueuesZNodeCversion();
+    for (int retry = 0; ; retry++) {
+      List<String> rss = replicationQueues.getListOfReplicators();
+      if (rss == null) {
+        LOG.debug("Didn't find any region server that replicates, won't prevent any deletions.");
+        return ImmutableSet.of();
       }
-      for (String id : listOfPeers) {
-        List<String> peersWals = replicationQueues.getLogsInQueue(rs, id);
-        if (peersWals != null) {
-          wals.addAll(peersWals);
+      Set<String> wals = Sets.newHashSet();
+      for (String rs : rss) {
+        List<String> listOfPeers = replicationQueues.getAllQueues(rs);
+        // if rs just died, this will be null
+        if (listOfPeers == null) {
+          continue;
         }
+        for (String id : listOfPeers) {
+          List<String> peersWals = replicationQueues.getLogsInQueue(rs, id);
+          if (peersWals != null) {
+            wals.addAll(peersWals);
+          }
+        }
+      }
+      int v1 = replicationQueues.getQueuesZNodeCversion();
+      if (v0 == v1) {
+        return wals;
       }
+      LOG.info(String.format("Replication queue node cversion changed from %d to %d, retry = %d",
+          v0, v1, retry));
     }
-    return wals;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/e00caff3/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
index e560620..f05eceb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
@@ -28,6 +28,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.zookeeper.KeeperException;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -66,7 +67,7 @@ public abstract class TestReplicationStateBasic {
   }
 
   @Test
-  public void testReplicationQueuesClient() throws ReplicationException {
+  public void testReplicationQueuesClient() throws ReplicationException, KeeperException {
     rqc.init();
     // Test methods with empty state
     assertEquals(0, rqc.getListOfReplicators().size());

http://git-wip-us.apache.org/repos/asf/hbase/blob/e00caff3/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 9ec0390..bb8e7bd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
 import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
 import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker;
@@ -367,6 +368,38 @@ public class TestReplicationSourceManager {
     server.abort("", null);
   }
 
+  @Test
+  public void testFailoverDeadServerCversionChange() throws Exception {
+    LOG.debug("testFailoverDeadServerCversionChange");
+
+    conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
+    final Server s0 = new DummyServer("cversion-change0.example.org");
+    ReplicationQueues repQueues =
+        ReplicationFactory.getReplicationQueues(s0.getZooKeeper(), conf, s0);
+    repQueues.init(s0.getServerName().toString());
+    // populate some znodes in the peer znode
+    files.add("log1");
+    files.add("log2");
+    for (String file : files) {
+      repQueues.addLog("1", file);
+    }
+    // simulate queue transfer
+    Server s1 = new DummyServer("cversion-change1.example.org");
+    ReplicationQueues rq1 =
+        ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1);
+    rq1.init(s1.getServerName().toString());
+
+    ReplicationQueuesClient client =
+        ReplicationFactory.getReplicationQueuesClient(s1.getZooKeeper(), s1.getConfiguration(), s1);
+
+    int v0 = client.getQueuesZNodeCversion();
+    rq1.claimQueues(s0.getServerName().getServerName());
+    int v1 = client.getQueuesZNodeCversion();
+    // cversion should increased by 1 since a child node is deleted
+    assertEquals(v0 + 1, v1);
+
+    s0.abort("", null);
+  }
 
   static class DummyNodeFailoverWorker extends Thread {
     private SortedMap<String, SortedSet<String>> logZnodesMap;


[5/6] hbase git commit: HBASE-12865 WALs may be deleted before they are replicated to peers (He Liangliang)

Posted by ap...@apache.org.
HBASE-12865 WALs may be deleted before they are replicated to peers (He Liangliang)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/68cb53d1
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/68cb53d1
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/68cb53d1

Branch: refs/heads/branch-1
Commit: 68cb53d1512411e91c864b29da0a4f9fb1c3e69a
Parents: 832a615
Author: Andrew Purtell <ap...@apache.org>
Authored: Fri Aug 7 15:08:29 2015 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Fri Aug 7 15:08:29 2015 -0700

----------------------------------------------------------------------
 .../replication/ReplicationQueuesClient.java    | 17 ++++--
 .../ReplicationQueuesClientZKImpl.java          | 17 +++++-
 .../replication/ReplicationQueuesZKImpl.java    |  4 +-
 .../master/ReplicationLogCleaner.java           | 59 +++++++++++++-------
 .../replication/TestReplicationStateBasic.java  |  3 +-
 .../TestReplicationSourceManager.java           | 33 +++++++++++
 6 files changed, 106 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/68cb53d1/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
index fed1791..5b3e541 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
@@ -21,7 +21,7 @@ package org.apache.hadoop.hbase.replication;
 import java.util.List;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-
+import org.apache.zookeeper.KeeperException;
 
 /**
  * This provides an interface for clients of replication to view replication queues. These queues
@@ -39,21 +39,30 @@ public interface ReplicationQueuesClient {
    * Get a list of all region servers that have outstanding replication queues. These servers could
    * be alive, dead or from a previous run of the cluster.
    * @return a list of server names
+   * @throws KeeperException zookeeper exception
    */
-  List<String> getListOfReplicators();
+  List<String> getListOfReplicators() throws KeeperException;
 
   /**
    * Get a list of all WALs in the given queue on the given region server.
    * @param serverName the server name of the region server that owns the queue
    * @param queueId a String that identifies the queue
    * @return a list of WALs, null if this region server is dead and has no outstanding queues
+   * @throws KeeperException zookeeper exception
    */
-  List<String> getLogsInQueue(String serverName, String queueId);
+  List<String> getLogsInQueue(String serverName, String queueId) throws KeeperException;
 
   /**
    * Get a list of all queues for the specified region server.
    * @param serverName the server name of the region server that owns the set of queues
    * @return a list of queueIds, null if this region server is not a replicator.
    */
-  List<String> getAllQueues(String serverName);
+  List<String> getAllQueues(String serverName) throws KeeperException;
+
+  /**
+   * Get the cversion of replication rs node. This can be used as optimistic locking to get a
+   * consistent snapshot of the replication queues.
+   * @return cversion of replication rs node
+   */
+  int getQueuesZNodeCversion() throws KeeperException;
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/68cb53d1/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
index 43262a0..e1a6a49 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
 
 @InterfaceAudience.Private
 public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implements
@@ -46,7 +47,7 @@ public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implem
   }
 
   @Override
-  public List<String> getLogsInQueue(String serverName, String queueId) {
+  public List<String> getLogsInQueue(String serverName, String queueId) throws KeeperException {
     String znode = ZKUtil.joinZNode(this.queuesZNode, serverName);
     znode = ZKUtil.joinZNode(znode, queueId);
     List<String> result = null;
@@ -55,20 +56,32 @@ public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implem
     } catch (KeeperException e) {
       this.abortable.abort("Failed to get list of wals for queueId=" + queueId
           + " and serverName=" + serverName, e);
+      throw e;
     }
     return result;
   }
 
   @Override
-  public List<String> getAllQueues(String serverName) {
+  public List<String> getAllQueues(String serverName) throws KeeperException {
     String znode = ZKUtil.joinZNode(this.queuesZNode, serverName);
     List<String> result = null;
     try {
       result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
     } catch (KeeperException e) {
       this.abortable.abort("Failed to get list of queues for serverName=" + serverName, e);
+      throw e;
     }
     return result;
   }
 
+  @Override public int getQueuesZNodeCversion() throws KeeperException {
+    try {
+      Stat stat = new Stat();
+      ZKUtil.getDataNoWatch(this.zookeeper, this.queuesZNode, stat);
+      return stat.getCversion();
+    } catch (KeeperException e) {
+      this.abortable.abort("Failed to get stat of replication rs node", e);
+      throw e;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/68cb53d1/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
index 0a6ba44..26ca6ba 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
@@ -334,7 +334,9 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
         // add delete op for peer
         listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
       }
-      // add delete op for dead rs
+      // add delete op for dead rs, this will update the cversion of the parent.
+      // The reader will make optimistic locking with this to get a consistent
+      // snapshot
       listOfOps.add(ZKUtilOp.deleteNodeFailSilent(deadRSZnodePath));
       if (LOG.isTraceEnabled()) LOG.trace(" The multi list size is: " + listOfOps.size());
       ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);

http://git-wip-us.apache.org/repos/asf/hbase/blob/68cb53d1/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
index 525b7ad..474f497 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 
@@ -39,6 +40,7 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
+import org.apache.zookeeper.KeeperException;
 
 /**
  * Implementation of a log cleaner that checks if a log is still scheduled for
@@ -61,7 +63,15 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo
       return files;
     }
 
-    final Set<String> wals = loadWALsFromQueues();
+    final Set<String> wals;
+    try {
+      // The concurrently created new WALs may not be included in the return list,
+      // but they won't be deleted because they're not in the checking set.
+      wals = loadWALsFromQueues();
+    } catch (KeeperException e) {
+      LOG.warn("Failed to read zookeeper, skipping checking deletable files");
+      return Collections.emptyList();
+    }
     return Iterables.filter(files, new Predicate<FileStatus>() {
       @Override
       public boolean apply(FileStatus file) {
@@ -79,29 +89,40 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo
   }
 
   /**
-   * Load all wals in all replication queues from ZK
+   * Load all wals in all replication queues from ZK. This method guarantees to return a
+   * snapshot which contains all WALs in the zookeeper at the start of this call even there
+   * is concurrent queue failover. However, some newly created WALs during the call may
+   * not be included.
    */
-  private Set<String> loadWALsFromQueues() {
-    List<String> rss = replicationQueues.getListOfReplicators();
-    if (rss == null) {
-      LOG.debug("Didn't find any region server that replicates, won't prevent any deletions.");
-      return ImmutableSet.of();
-    }
-    Set<String> wals = Sets.newHashSet();
-    for (String rs: rss) {
-      List<String> listOfPeers = replicationQueues.getAllQueues(rs);
-      // if rs just died, this will be null
-      if (listOfPeers == null) {
-        continue;
+  private Set<String> loadWALsFromQueues() throws KeeperException {
+    int v0 = replicationQueues.getQueuesZNodeCversion();
+    for (int retry = 0; ; retry++) {
+      List<String> rss = replicationQueues.getListOfReplicators();
+      if (rss == null) {
+        LOG.debug("Didn't find any region server that replicates, won't prevent any deletions.");
+        return ImmutableSet.of();
       }
-      for (String id : listOfPeers) {
-        List<String> peersWals = replicationQueues.getLogsInQueue(rs, id);
-        if (peersWals != null) {
-          wals.addAll(peersWals);
+      Set<String> wals = Sets.newHashSet();
+      for (String rs : rss) {
+        List<String> listOfPeers = replicationQueues.getAllQueues(rs);
+        // if rs just died, this will be null
+        if (listOfPeers == null) {
+          continue;
         }
+        for (String id : listOfPeers) {
+          List<String> peersWals = replicationQueues.getLogsInQueue(rs, id);
+          if (peersWals != null) {
+            wals.addAll(peersWals);
+          }
+        }
+      }
+      int v1 = replicationQueues.getQueuesZNodeCversion();
+      if (v0 == v1) {
+        return wals;
       }
+      LOG.info(String.format("Replication queue node cversion changed from %d to %d, retry = %d",
+          v0, v1, retry));
     }
-    return wals;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/68cb53d1/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
index e560620..f05eceb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
@@ -28,6 +28,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.zookeeper.KeeperException;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -66,7 +67,7 @@ public abstract class TestReplicationStateBasic {
   }
 
   @Test
-  public void testReplicationQueuesClient() throws ReplicationException {
+  public void testReplicationQueuesClient() throws ReplicationException, KeeperException {
     rqc.init();
     // Test methods with empty state
     assertEquals(0, rqc.getListOfReplicators().size());

http://git-wip-us.apache.org/repos/asf/hbase/blob/68cb53d1/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 9ec0390..bb8e7bd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
 import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
 import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker;
@@ -367,6 +368,38 @@ public class TestReplicationSourceManager {
     server.abort("", null);
   }
 
+  @Test
+  public void testFailoverDeadServerCversionChange() throws Exception {
+    LOG.debug("testFailoverDeadServerCversionChange");
+
+    conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
+    final Server s0 = new DummyServer("cversion-change0.example.org");
+    ReplicationQueues repQueues =
+        ReplicationFactory.getReplicationQueues(s0.getZooKeeper(), conf, s0);
+    repQueues.init(s0.getServerName().toString());
+    // populate some znodes in the peer znode
+    files.add("log1");
+    files.add("log2");
+    for (String file : files) {
+      repQueues.addLog("1", file);
+    }
+    // simulate queue transfer
+    Server s1 = new DummyServer("cversion-change1.example.org");
+    ReplicationQueues rq1 =
+        ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1);
+    rq1.init(s1.getServerName().toString());
+
+    ReplicationQueuesClient client =
+        ReplicationFactory.getReplicationQueuesClient(s1.getZooKeeper(), s1.getConfiguration(), s1);
+
+    int v0 = client.getQueuesZNodeCversion();
+    rq1.claimQueues(s0.getServerName().getServerName());
+    int v1 = client.getQueuesZNodeCversion();
+    // cversion should increased by 1 since a child node is deleted
+    assertEquals(v0 + 1, v1);
+
+    s0.abort("", null);
+  }
 
   static class DummyNodeFailoverWorker extends Thread {
     private SortedMap<String, SortedSet<String>> logZnodesMap;


[2/6] hbase git commit: HBASE-12865 WALs may be deleted before they are replicated to peers (He Liangliang)

Posted by ap...@apache.org.
HBASE-12865 WALs may be deleted before they are replicated to peers (He Liangliang)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/bc4498a2
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/bc4498a2
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/bc4498a2

Branch: refs/heads/branch-1.0
Commit: bc4498a2663ad230298c738b98f38adc594cb954
Parents: 5d802f3
Author: Andrew Purtell <ap...@apache.org>
Authored: Fri Aug 7 15:08:26 2015 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Fri Aug 7 15:08:26 2015 -0700

----------------------------------------------------------------------
 .../replication/ReplicationQueuesClient.java    | 17 ++++--
 .../ReplicationQueuesClientZKImpl.java          | 17 +++++-
 .../replication/ReplicationQueuesZKImpl.java    |  4 +-
 .../master/ReplicationLogCleaner.java           | 59 +++++++++++++-------
 .../replication/TestReplicationStateBasic.java  |  3 +-
 .../TestReplicationSourceManager.java           | 33 +++++++++++
 6 files changed, 106 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/bc4498a2/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
index fed1791..5b3e541 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
@@ -21,7 +21,7 @@ package org.apache.hadoop.hbase.replication;
 import java.util.List;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-
+import org.apache.zookeeper.KeeperException;
 
 /**
  * This provides an interface for clients of replication to view replication queues. These queues
@@ -39,21 +39,30 @@ public interface ReplicationQueuesClient {
    * Get a list of all region servers that have outstanding replication queues. These servers could
    * be alive, dead or from a previous run of the cluster.
    * @return a list of server names
+   * @throws KeeperException zookeeper exception
    */
-  List<String> getListOfReplicators();
+  List<String> getListOfReplicators() throws KeeperException;
 
   /**
    * Get a list of all WALs in the given queue on the given region server.
    * @param serverName the server name of the region server that owns the queue
    * @param queueId a String that identifies the queue
    * @return a list of WALs, null if this region server is dead and has no outstanding queues
+   * @throws KeeperException zookeeper exception
    */
-  List<String> getLogsInQueue(String serverName, String queueId);
+  List<String> getLogsInQueue(String serverName, String queueId) throws KeeperException;
 
   /**
    * Get a list of all queues for the specified region server.
    * @param serverName the server name of the region server that owns the set of queues
    * @return a list of queueIds, null if this region server is not a replicator.
    */
-  List<String> getAllQueues(String serverName);
+  List<String> getAllQueues(String serverName) throws KeeperException;
+
+  /**
+   * Get the cversion of replication rs node. This can be used as optimistic locking to get a
+   * consistent snapshot of the replication queues.
+   * @return cversion of replication rs node
+   */
+  int getQueuesZNodeCversion() throws KeeperException;
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/bc4498a2/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
index 43262a0..e1a6a49 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
 
 @InterfaceAudience.Private
 public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implements
@@ -46,7 +47,7 @@ public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implem
   }
 
   @Override
-  public List<String> getLogsInQueue(String serverName, String queueId) {
+  public List<String> getLogsInQueue(String serverName, String queueId) throws KeeperException {
     String znode = ZKUtil.joinZNode(this.queuesZNode, serverName);
     znode = ZKUtil.joinZNode(znode, queueId);
     List<String> result = null;
@@ -55,20 +56,32 @@ public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implem
     } catch (KeeperException e) {
       this.abortable.abort("Failed to get list of wals for queueId=" + queueId
           + " and serverName=" + serverName, e);
+      throw e;
     }
     return result;
   }
 
   @Override
-  public List<String> getAllQueues(String serverName) {
+  public List<String> getAllQueues(String serverName) throws KeeperException {
     String znode = ZKUtil.joinZNode(this.queuesZNode, serverName);
     List<String> result = null;
     try {
       result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
     } catch (KeeperException e) {
       this.abortable.abort("Failed to get list of queues for serverName=" + serverName, e);
+      throw e;
     }
     return result;
   }
 
+  @Override public int getQueuesZNodeCversion() throws KeeperException {
+    try {
+      Stat stat = new Stat();
+      ZKUtil.getDataNoWatch(this.zookeeper, this.queuesZNode, stat);
+      return stat.getCversion();
+    } catch (KeeperException e) {
+      this.abortable.abort("Failed to get stat of replication rs node", e);
+      throw e;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/bc4498a2/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
index 6a30511..b1d434d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
@@ -334,7 +334,9 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
         // add delete op for peer
         listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
       }
-      // add delete op for dead rs
+      // add delete op for dead rs, this will update the cversion of the parent.
+      // The reader will make optimistic locking with this to get a consistent
+      // snapshot
       listOfOps.add(ZKUtilOp.deleteNodeFailSilent(deadRSZnodePath));
       LOG.debug(" The multi list size is: " + listOfOps.size());
       ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);

http://git-wip-us.apache.org/repos/asf/hbase/blob/bc4498a2/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
index 525b7ad..474f497 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 
@@ -39,6 +40,7 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
+import org.apache.zookeeper.KeeperException;
 
 /**
  * Implementation of a log cleaner that checks if a log is still scheduled for
@@ -61,7 +63,15 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo
       return files;
     }
 
-    final Set<String> wals = loadWALsFromQueues();
+    final Set<String> wals;
+    try {
+      // The concurrently created new WALs may not be included in the return list,
+      // but they won't be deleted because they're not in the checking set.
+      wals = loadWALsFromQueues();
+    } catch (KeeperException e) {
+      LOG.warn("Failed to read zookeeper, skipping checking deletable files");
+      return Collections.emptyList();
+    }
     return Iterables.filter(files, new Predicate<FileStatus>() {
       @Override
       public boolean apply(FileStatus file) {
@@ -79,29 +89,40 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo
   }
 
   /**
-   * Load all wals in all replication queues from ZK
+   * Load all wals in all replication queues from ZK. This method guarantees to return a
+   * snapshot which contains all WALs in the zookeeper at the start of this call even there
+   * is concurrent queue failover. However, some newly created WALs during the call may
+   * not be included.
    */
-  private Set<String> loadWALsFromQueues() {
-    List<String> rss = replicationQueues.getListOfReplicators();
-    if (rss == null) {
-      LOG.debug("Didn't find any region server that replicates, won't prevent any deletions.");
-      return ImmutableSet.of();
-    }
-    Set<String> wals = Sets.newHashSet();
-    for (String rs: rss) {
-      List<String> listOfPeers = replicationQueues.getAllQueues(rs);
-      // if rs just died, this will be null
-      if (listOfPeers == null) {
-        continue;
+  private Set<String> loadWALsFromQueues() throws KeeperException {
+    int v0 = replicationQueues.getQueuesZNodeCversion();
+    for (int retry = 0; ; retry++) {
+      List<String> rss = replicationQueues.getListOfReplicators();
+      if (rss == null) {
+        LOG.debug("Didn't find any region server that replicates, won't prevent any deletions.");
+        return ImmutableSet.of();
       }
-      for (String id : listOfPeers) {
-        List<String> peersWals = replicationQueues.getLogsInQueue(rs, id);
-        if (peersWals != null) {
-          wals.addAll(peersWals);
+      Set<String> wals = Sets.newHashSet();
+      for (String rs : rss) {
+        List<String> listOfPeers = replicationQueues.getAllQueues(rs);
+        // if rs just died, this will be null
+        if (listOfPeers == null) {
+          continue;
         }
+        for (String id : listOfPeers) {
+          List<String> peersWals = replicationQueues.getLogsInQueue(rs, id);
+          if (peersWals != null) {
+            wals.addAll(peersWals);
+          }
+        }
+      }
+      int v1 = replicationQueues.getQueuesZNodeCversion();
+      if (v0 == v1) {
+        return wals;
       }
+      LOG.info(String.format("Replication queue node cversion changed from %d to %d, retry = %d",
+          v0, v1, retry));
     }
-    return wals;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/bc4498a2/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
index e560620..f05eceb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
@@ -28,6 +28,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.zookeeper.KeeperException;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -66,7 +67,7 @@ public abstract class TestReplicationStateBasic {
   }
 
   @Test
-  public void testReplicationQueuesClient() throws ReplicationException {
+  public void testReplicationQueuesClient() throws ReplicationException, KeeperException {
     rqc.init();
     // Test methods with empty state
     assertEquals(0, rqc.getListOfReplicators().size());

http://git-wip-us.apache.org/repos/asf/hbase/blob/bc4498a2/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 3b56617..487dc49 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -60,6 +60,7 @@ import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
 import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
 import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker;
@@ -366,6 +367,38 @@ public class TestReplicationSourceManager {
     server.abort("", null);
   }
 
+  @Test
+  public void testFailoverDeadServerCversionChange() throws Exception {
+    LOG.debug("testFailoverDeadServerCversionChange");
+
+    conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
+    final Server s0 = new DummyServer("cversion-change0.example.org");
+    ReplicationQueues repQueues =
+        ReplicationFactory.getReplicationQueues(s0.getZooKeeper(), conf, s0);
+    repQueues.init(s0.getServerName().toString());
+    // populate some znodes in the peer znode
+    files.add("log1");
+    files.add("log2");
+    for (String file : files) {
+      repQueues.addLog("1", file);
+    }
+    // simulate queue transfer
+    Server s1 = new DummyServer("cversion-change1.example.org");
+    ReplicationQueues rq1 =
+        ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1);
+    rq1.init(s1.getServerName().toString());
+
+    ReplicationQueuesClient client =
+        ReplicationFactory.getReplicationQueuesClient(s1.getZooKeeper(), s1.getConfiguration(), s1);
+
+    int v0 = client.getQueuesZNodeCversion();
+    rq1.claimQueues(s0.getServerName().getServerName());
+    int v1 = client.getQueuesZNodeCversion();
+    // cversion should increased by 1 since a child node is deleted
+    assertEquals(v0 + 1, v1);
+
+    s0.abort("", null);
+  }
 
   static class DummyNodeFailoverWorker extends Thread {
     private SortedMap<String, SortedSet<String>> logZnodesMap;


[6/6] hbase git commit: HBASE-12865 WALs may be deleted before they are replicated to peers (He Liangliang)

Posted by ap...@apache.org.
HBASE-12865 WALs may be deleted before they are replicated to peers (He Liangliang)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ea8833fd
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ea8833fd
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ea8833fd

Branch: refs/heads/master
Commit: ea8833fd63c9ef35292766f646b1e507577e46f5
Parents: f1c1692
Author: Andrew Purtell <ap...@apache.org>
Authored: Fri Aug 7 15:08:30 2015 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Fri Aug 7 15:08:30 2015 -0700

----------------------------------------------------------------------
 .../replication/ReplicationQueuesClient.java    | 17 ++++--
 .../ReplicationQueuesClientZKImpl.java          | 17 +++++-
 .../replication/ReplicationQueuesZKImpl.java    |  4 +-
 .../master/ReplicationLogCleaner.java           | 59 +++++++++++++-------
 .../replication/TestReplicationStateBasic.java  |  3 +-
 .../TestReplicationSourceManager.java           | 33 +++++++++++
 6 files changed, 106 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/ea8833fd/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
index fed1791..5b3e541 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
@@ -21,7 +21,7 @@ package org.apache.hadoop.hbase.replication;
 import java.util.List;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-
+import org.apache.zookeeper.KeeperException;
 
 /**
  * This provides an interface for clients of replication to view replication queues. These queues
@@ -39,21 +39,30 @@ public interface ReplicationQueuesClient {
    * Get a list of all region servers that have outstanding replication queues. These servers could
    * be alive, dead or from a previous run of the cluster.
    * @return a list of server names
+   * @throws KeeperException zookeeper exception
    */
-  List<String> getListOfReplicators();
+  List<String> getListOfReplicators() throws KeeperException;
 
   /**
    * Get a list of all WALs in the given queue on the given region server.
    * @param serverName the server name of the region server that owns the queue
    * @param queueId a String that identifies the queue
    * @return a list of WALs, null if this region server is dead and has no outstanding queues
+   * @throws KeeperException zookeeper exception
    */
-  List<String> getLogsInQueue(String serverName, String queueId);
+  List<String> getLogsInQueue(String serverName, String queueId) throws KeeperException;
 
   /**
    * Get a list of all queues for the specified region server.
    * @param serverName the server name of the region server that owns the set of queues
    * @return a list of queueIds, null if this region server is not a replicator.
    */
-  List<String> getAllQueues(String serverName);
+  List<String> getAllQueues(String serverName) throws KeeperException;
+
+  /**
+   * Get the cversion of replication rs node. This can be used as optimistic locking to get a
+   * consistent snapshot of the replication queues.
+   * @return cversion of replication rs node
+   */
+  int getQueuesZNodeCversion() throws KeeperException;
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ea8833fd/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
index 43262a0..e1a6a49 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
 
 @InterfaceAudience.Private
 public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implements
@@ -46,7 +47,7 @@ public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implem
   }
 
   @Override
-  public List<String> getLogsInQueue(String serverName, String queueId) {
+  public List<String> getLogsInQueue(String serverName, String queueId) throws KeeperException {
     String znode = ZKUtil.joinZNode(this.queuesZNode, serverName);
     znode = ZKUtil.joinZNode(znode, queueId);
     List<String> result = null;
@@ -55,20 +56,32 @@ public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implem
     } catch (KeeperException e) {
       this.abortable.abort("Failed to get list of wals for queueId=" + queueId
           + " and serverName=" + serverName, e);
+      throw e;
     }
     return result;
   }
 
   @Override
-  public List<String> getAllQueues(String serverName) {
+  public List<String> getAllQueues(String serverName) throws KeeperException {
     String znode = ZKUtil.joinZNode(this.queuesZNode, serverName);
     List<String> result = null;
     try {
       result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
     } catch (KeeperException e) {
       this.abortable.abort("Failed to get list of queues for serverName=" + serverName, e);
+      throw e;
     }
     return result;
   }
 
+  @Override public int getQueuesZNodeCversion() throws KeeperException {
+    try {
+      Stat stat = new Stat();
+      ZKUtil.getDataNoWatch(this.zookeeper, this.queuesZNode, stat);
+      return stat.getCversion();
+    } catch (KeeperException e) {
+      this.abortable.abort("Failed to get stat of replication rs node", e);
+      throw e;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ea8833fd/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
index 0535b4b2..97763e2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
@@ -334,7 +334,9 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
         // add delete op for peer
         listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
       }
-      // add delete op for dead rs
+      // add delete op for dead rs, this will update the cversion of the parent.
+      // The reader will make optimistic locking with this to get a consistent
+      // snapshot
       listOfOps.add(ZKUtilOp.deleteNodeFailSilent(deadRSZnodePath));
       if (LOG.isTraceEnabled()) LOG.trace(" The multi list size is: " + listOfOps.size());
       ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);

http://git-wip-us.apache.org/repos/asf/hbase/blob/ea8833fd/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
index 525b7ad..474f497 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 
@@ -39,6 +40,7 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
+import org.apache.zookeeper.KeeperException;
 
 /**
  * Implementation of a log cleaner that checks if a log is still scheduled for
@@ -61,7 +63,15 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo
       return files;
     }
 
-    final Set<String> wals = loadWALsFromQueues();
+    final Set<String> wals;
+    try {
+      // The concurrently created new WALs may not be included in the return list,
+      // but they won't be deleted because they're not in the checking set.
+      wals = loadWALsFromQueues();
+    } catch (KeeperException e) {
+      LOG.warn("Failed to read zookeeper, skipping checking deletable files");
+      return Collections.emptyList();
+    }
     return Iterables.filter(files, new Predicate<FileStatus>() {
       @Override
       public boolean apply(FileStatus file) {
@@ -79,29 +89,40 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo
   }
 
   /**
-   * Load all wals in all replication queues from ZK
+   * Load all wals in all replication queues from ZK. This method guarantees to return a
+   * snapshot which contains all WALs in the zookeeper at the start of this call even there
+   * is concurrent queue failover. However, some newly created WALs during the call may
+   * not be included.
    */
-  private Set<String> loadWALsFromQueues() {
-    List<String> rss = replicationQueues.getListOfReplicators();
-    if (rss == null) {
-      LOG.debug("Didn't find any region server that replicates, won't prevent any deletions.");
-      return ImmutableSet.of();
-    }
-    Set<String> wals = Sets.newHashSet();
-    for (String rs: rss) {
-      List<String> listOfPeers = replicationQueues.getAllQueues(rs);
-      // if rs just died, this will be null
-      if (listOfPeers == null) {
-        continue;
+  private Set<String> loadWALsFromQueues() throws KeeperException {
+    int v0 = replicationQueues.getQueuesZNodeCversion();
+    for (int retry = 0; ; retry++) {
+      List<String> rss = replicationQueues.getListOfReplicators();
+      if (rss == null) {
+        LOG.debug("Didn't find any region server that replicates, won't prevent any deletions.");
+        return ImmutableSet.of();
       }
-      for (String id : listOfPeers) {
-        List<String> peersWals = replicationQueues.getLogsInQueue(rs, id);
-        if (peersWals != null) {
-          wals.addAll(peersWals);
+      Set<String> wals = Sets.newHashSet();
+      for (String rs : rss) {
+        List<String> listOfPeers = replicationQueues.getAllQueues(rs);
+        // if rs just died, this will be null
+        if (listOfPeers == null) {
+          continue;
         }
+        for (String id : listOfPeers) {
+          List<String> peersWals = replicationQueues.getLogsInQueue(rs, id);
+          if (peersWals != null) {
+            wals.addAll(peersWals);
+          }
+        }
+      }
+      int v1 = replicationQueues.getQueuesZNodeCversion();
+      if (v0 == v1) {
+        return wals;
       }
+      LOG.info(String.format("Replication queue node cversion changed from %d to %d, retry = %d",
+          v0, v1, retry));
     }
-    return wals;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/ea8833fd/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
index e560620..f05eceb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
@@ -28,6 +28,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.zookeeper.KeeperException;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -66,7 +67,7 @@ public abstract class TestReplicationStateBasic {
   }
 
   @Test
-  public void testReplicationQueuesClient() throws ReplicationException {
+  public void testReplicationQueuesClient() throws ReplicationException, KeeperException {
     rqc.init();
     // Test methods with empty state
     assertEquals(0, rqc.getListOfReplicators().size());

http://git-wip-us.apache.org/repos/asf/hbase/blob/ea8833fd/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 9315f62..571be26 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
 import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
 import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker;
@@ -368,6 +369,38 @@ public class TestReplicationSourceManager {
     server.abort("", null);
   }
 
+  @Test
+  public void testFailoverDeadServerCversionChange() throws Exception {
+    LOG.debug("testFailoverDeadServerCversionChange");
+
+    conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
+    final Server s0 = new DummyServer("cversion-change0.example.org");
+    ReplicationQueues repQueues =
+        ReplicationFactory.getReplicationQueues(s0.getZooKeeper(), conf, s0);
+    repQueues.init(s0.getServerName().toString());
+    // populate some znodes in the peer znode
+    files.add("log1");
+    files.add("log2");
+    for (String file : files) {
+      repQueues.addLog("1", file);
+    }
+    // simulate queue transfer
+    Server s1 = new DummyServer("cversion-change1.example.org");
+    ReplicationQueues rq1 =
+        ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1);
+    rq1.init(s1.getServerName().toString());
+
+    ReplicationQueuesClient client =
+        ReplicationFactory.getReplicationQueuesClient(s1.getZooKeeper(), s1.getConfiguration(), s1);
+
+    int v0 = client.getQueuesZNodeCversion();
+    rq1.claimQueues(s0.getServerName().getServerName());
+    int v1 = client.getQueuesZNodeCversion();
+    // cversion should increased by 1 since a child node is deleted
+    assertEquals(v0 + 1, v1);
+
+    s0.abort("", null);
+  }
 
   static class DummyNodeFailoverWorker extends Thread {
     private SortedMap<String, SortedSet<String>> logZnodesMap;


[4/6] hbase git commit: HBASE-12865 WALs may be deleted before they are replicated to peers (He Liangliang)

Posted by ap...@apache.org.
HBASE-12865 WALs may be deleted before they are replicated to peers (He Liangliang)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/7abb12be
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/7abb12be
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/7abb12be

Branch: refs/heads/branch-1.2
Commit: 7abb12be26115eda7341b82b9860990a14bc6040
Parents: bb1407d
Author: Andrew Purtell <ap...@apache.org>
Authored: Fri Aug 7 15:08:28 2015 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Fri Aug 7 15:08:28 2015 -0700

----------------------------------------------------------------------
 .../replication/ReplicationQueuesClient.java    | 17 ++++--
 .../ReplicationQueuesClientZKImpl.java          | 17 +++++-
 .../replication/ReplicationQueuesZKImpl.java    |  4 +-
 .../master/ReplicationLogCleaner.java           | 59 +++++++++++++-------
 .../replication/TestReplicationStateBasic.java  |  3 +-
 .../TestReplicationSourceManager.java           | 33 +++++++++++
 6 files changed, 106 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/7abb12be/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
index fed1791..5b3e541 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
@@ -21,7 +21,7 @@ package org.apache.hadoop.hbase.replication;
 import java.util.List;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-
+import org.apache.zookeeper.KeeperException;
 
 /**
  * This provides an interface for clients of replication to view replication queues. These queues
@@ -39,21 +39,30 @@ public interface ReplicationQueuesClient {
    * Get a list of all region servers that have outstanding replication queues. These servers could
    * be alive, dead or from a previous run of the cluster.
    * @return a list of server names
+   * @throws KeeperException zookeeper exception
    */
-  List<String> getListOfReplicators();
+  List<String> getListOfReplicators() throws KeeperException;
 
   /**
    * Get a list of all WALs in the given queue on the given region server.
    * @param serverName the server name of the region server that owns the queue
    * @param queueId a String that identifies the queue
    * @return a list of WALs, null if this region server is dead and has no outstanding queues
+   * @throws KeeperException zookeeper exception
    */
-  List<String> getLogsInQueue(String serverName, String queueId);
+  List<String> getLogsInQueue(String serverName, String queueId) throws KeeperException;
 
   /**
    * Get a list of all queues for the specified region server.
    * @param serverName the server name of the region server that owns the set of queues
    * @return a list of queueIds, null if this region server is not a replicator.
    */
-  List<String> getAllQueues(String serverName);
+  List<String> getAllQueues(String serverName) throws KeeperException;
+
+  /**
+   * Get the cversion of replication rs node. This can be used as optimistic locking to get a
+   * consistent snapshot of the replication queues.
+   * @return cversion of replication rs node
+   */
+  int getQueuesZNodeCversion() throws KeeperException;
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/7abb12be/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
index 43262a0..e1a6a49 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
 
 @InterfaceAudience.Private
 public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implements
@@ -46,7 +47,7 @@ public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implem
   }
 
   @Override
-  public List<String> getLogsInQueue(String serverName, String queueId) {
+  public List<String> getLogsInQueue(String serverName, String queueId) throws KeeperException {
     String znode = ZKUtil.joinZNode(this.queuesZNode, serverName);
     znode = ZKUtil.joinZNode(znode, queueId);
     List<String> result = null;
@@ -55,20 +56,32 @@ public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implem
     } catch (KeeperException e) {
       this.abortable.abort("Failed to get list of wals for queueId=" + queueId
           + " and serverName=" + serverName, e);
+      throw e;
     }
     return result;
   }
 
   @Override
-  public List<String> getAllQueues(String serverName) {
+  public List<String> getAllQueues(String serverName) throws KeeperException {
     String znode = ZKUtil.joinZNode(this.queuesZNode, serverName);
     List<String> result = null;
     try {
       result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
     } catch (KeeperException e) {
       this.abortable.abort("Failed to get list of queues for serverName=" + serverName, e);
+      throw e;
     }
     return result;
   }
 
+  @Override public int getQueuesZNodeCversion() throws KeeperException {
+    try {
+      Stat stat = new Stat();
+      ZKUtil.getDataNoWatch(this.zookeeper, this.queuesZNode, stat);
+      return stat.getCversion();
+    } catch (KeeperException e) {
+      this.abortable.abort("Failed to get stat of replication rs node", e);
+      throw e;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/7abb12be/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
index 0a6ba44..26ca6ba 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
@@ -334,7 +334,9 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
         // add delete op for peer
         listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
       }
-      // add delete op for dead rs
+      // add delete op for dead rs, this will update the cversion of the parent.
+      // The reader will make optimistic locking with this to get a consistent
+      // snapshot
       listOfOps.add(ZKUtilOp.deleteNodeFailSilent(deadRSZnodePath));
       if (LOG.isTraceEnabled()) LOG.trace(" The multi list size is: " + listOfOps.size());
       ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);

http://git-wip-us.apache.org/repos/asf/hbase/blob/7abb12be/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
index 525b7ad..474f497 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 
@@ -39,6 +40,7 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
+import org.apache.zookeeper.KeeperException;
 
 /**
  * Implementation of a log cleaner that checks if a log is still scheduled for
@@ -61,7 +63,15 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo
       return files;
     }
 
-    final Set<String> wals = loadWALsFromQueues();
+    final Set<String> wals;
+    try {
+      // The concurrently created new WALs may not be included in the return list,
+      // but they won't be deleted because they're not in the checking set.
+      wals = loadWALsFromQueues();
+    } catch (KeeperException e) {
+      LOG.warn("Failed to read zookeeper, skipping checking deletable files");
+      return Collections.emptyList();
+    }
     return Iterables.filter(files, new Predicate<FileStatus>() {
       @Override
       public boolean apply(FileStatus file) {
@@ -79,29 +89,40 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo
   }
 
   /**
-   * Load all wals in all replication queues from ZK
+   * Load all wals in all replication queues from ZK. This method guarantees to return a
+   * snapshot which contains all WALs in the zookeeper at the start of this call even there
+   * is concurrent queue failover. However, some newly created WALs during the call may
+   * not be included.
    */
-  private Set<String> loadWALsFromQueues() {
-    List<String> rss = replicationQueues.getListOfReplicators();
-    if (rss == null) {
-      LOG.debug("Didn't find any region server that replicates, won't prevent any deletions.");
-      return ImmutableSet.of();
-    }
-    Set<String> wals = Sets.newHashSet();
-    for (String rs: rss) {
-      List<String> listOfPeers = replicationQueues.getAllQueues(rs);
-      // if rs just died, this will be null
-      if (listOfPeers == null) {
-        continue;
+  private Set<String> loadWALsFromQueues() throws KeeperException {
+    int v0 = replicationQueues.getQueuesZNodeCversion();
+    for (int retry = 0; ; retry++) {
+      List<String> rss = replicationQueues.getListOfReplicators();
+      if (rss == null) {
+        LOG.debug("Didn't find any region server that replicates, won't prevent any deletions.");
+        return ImmutableSet.of();
       }
-      for (String id : listOfPeers) {
-        List<String> peersWals = replicationQueues.getLogsInQueue(rs, id);
-        if (peersWals != null) {
-          wals.addAll(peersWals);
+      Set<String> wals = Sets.newHashSet();
+      for (String rs : rss) {
+        List<String> listOfPeers = replicationQueues.getAllQueues(rs);
+        // if rs just died, this will be null
+        if (listOfPeers == null) {
+          continue;
         }
+        for (String id : listOfPeers) {
+          List<String> peersWals = replicationQueues.getLogsInQueue(rs, id);
+          if (peersWals != null) {
+            wals.addAll(peersWals);
+          }
+        }
+      }
+      int v1 = replicationQueues.getQueuesZNodeCversion();
+      if (v0 == v1) {
+        return wals;
       }
+      LOG.info(String.format("Replication queue node cversion changed from %d to %d, retry = %d",
+          v0, v1, retry));
     }
-    return wals;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/7abb12be/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
index e560620..f05eceb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
@@ -28,6 +28,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.zookeeper.KeeperException;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -66,7 +67,7 @@ public abstract class TestReplicationStateBasic {
   }
 
   @Test
-  public void testReplicationQueuesClient() throws ReplicationException {
+  public void testReplicationQueuesClient() throws ReplicationException, KeeperException {
     rqc.init();
     // Test methods with empty state
     assertEquals(0, rqc.getListOfReplicators().size());

http://git-wip-us.apache.org/repos/asf/hbase/blob/7abb12be/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 9ec0390..bb8e7bd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
 import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
 import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker;
@@ -367,6 +368,38 @@ public class TestReplicationSourceManager {
     server.abort("", null);
   }
 
+  @Test
+  public void testFailoverDeadServerCversionChange() throws Exception {
+    LOG.debug("testFailoverDeadServerCversionChange");
+
+    conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
+    final Server s0 = new DummyServer("cversion-change0.example.org");
+    ReplicationQueues repQueues =
+        ReplicationFactory.getReplicationQueues(s0.getZooKeeper(), conf, s0);
+    repQueues.init(s0.getServerName().toString());
+    // populate some znodes in the peer znode
+    files.add("log1");
+    files.add("log2");
+    for (String file : files) {
+      repQueues.addLog("1", file);
+    }
+    // simulate queue transfer
+    Server s1 = new DummyServer("cversion-change1.example.org");
+    ReplicationQueues rq1 =
+        ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1);
+    rq1.init(s1.getServerName().toString());
+
+    ReplicationQueuesClient client =
+        ReplicationFactory.getReplicationQueuesClient(s1.getZooKeeper(), s1.getConfiguration(), s1);
+
+    int v0 = client.getQueuesZNodeCversion();
+    rq1.claimQueues(s0.getServerName().getServerName());
+    int v1 = client.getQueuesZNodeCversion();
+    // cversion should increased by 1 since a child node is deleted
+    assertEquals(v0 + 1, v1);
+
+    s0.abort("", null);
+  }
 
   static class DummyNodeFailoverWorker extends Thread {
     private SortedMap<String, SortedSet<String>> logZnodesMap;