You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2015/08/08 00:14:56 UTC
[6/6] hbase git commit: HBASE-12865 WALs may be deleted before they
are replicated to peers (He Liangliang)
HBASE-12865 WALs may be deleted before they are replicated to peers (He Liangliang)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ea8833fd
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ea8833fd
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ea8833fd
Branch: refs/heads/master
Commit: ea8833fd63c9ef35292766f646b1e507577e46f5
Parents: f1c1692
Author: Andrew Purtell <ap...@apache.org>
Authored: Fri Aug 7 15:08:30 2015 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Fri Aug 7 15:08:30 2015 -0700
----------------------------------------------------------------------
.../replication/ReplicationQueuesClient.java | 17 ++++--
.../ReplicationQueuesClientZKImpl.java | 17 +++++-
.../replication/ReplicationQueuesZKImpl.java | 4 +-
.../master/ReplicationLogCleaner.java | 59 +++++++++++++-------
.../replication/TestReplicationStateBasic.java | 3 +-
.../TestReplicationSourceManager.java | 33 +++++++++++
6 files changed, 106 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/ea8833fd/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
index fed1791..5b3e541 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
@@ -21,7 +21,7 @@ package org.apache.hadoop.hbase.replication;
import java.util.List;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-
+import org.apache.zookeeper.KeeperException;
/**
* This provides an interface for clients of replication to view replication queues. These queues
@@ -39,21 +39,30 @@ public interface ReplicationQueuesClient {
* Get a list of all region servers that have outstanding replication queues. These servers could
* be alive, dead or from a previous run of the cluster.
* @return a list of server names
+ * @throws KeeperException zookeeper exception
*/
- List<String> getListOfReplicators();
+ List<String> getListOfReplicators() throws KeeperException;
/**
* Get a list of all WALs in the given queue on the given region server.
* @param serverName the server name of the region server that owns the queue
* @param queueId a String that identifies the queue
* @return a list of WALs, null if this region server is dead and has no outstanding queues
+ * @throws KeeperException zookeeper exception
*/
- List<String> getLogsInQueue(String serverName, String queueId);
+ List<String> getLogsInQueue(String serverName, String queueId) throws KeeperException;
/**
* Get a list of all queues for the specified region server.
* @param serverName the server name of the region server that owns the set of queues
* @return a list of queueIds, null if this region server is not a replicator.
*/
- List<String> getAllQueues(String serverName);
+ List<String> getAllQueues(String serverName) throws KeeperException;
+
+ /**
+ * Get the cversion of replication rs node. This can be used as optimistic locking to get a
+ * consistent snapshot of the replication queues.
+ * @return cversion of replication rs node
+ */
+ int getQueuesZNodeCversion() throws KeeperException;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ea8833fd/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
index 43262a0..e1a6a49 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
@InterfaceAudience.Private
public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implements
@@ -46,7 +47,7 @@ public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implem
}
@Override
- public List<String> getLogsInQueue(String serverName, String queueId) {
+ public List<String> getLogsInQueue(String serverName, String queueId) throws KeeperException {
String znode = ZKUtil.joinZNode(this.queuesZNode, serverName);
znode = ZKUtil.joinZNode(znode, queueId);
List<String> result = null;
@@ -55,20 +56,32 @@ public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implem
} catch (KeeperException e) {
this.abortable.abort("Failed to get list of wals for queueId=" + queueId
+ " and serverName=" + serverName, e);
+ throw e;
}
return result;
}
@Override
- public List<String> getAllQueues(String serverName) {
+ public List<String> getAllQueues(String serverName) throws KeeperException {
String znode = ZKUtil.joinZNode(this.queuesZNode, serverName);
List<String> result = null;
try {
result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
} catch (KeeperException e) {
this.abortable.abort("Failed to get list of queues for serverName=" + serverName, e);
+ throw e;
}
return result;
}
+ @Override public int getQueuesZNodeCversion() throws KeeperException {
+ try {
+ Stat stat = new Stat();
+ ZKUtil.getDataNoWatch(this.zookeeper, this.queuesZNode, stat);
+ return stat.getCversion();
+ } catch (KeeperException e) {
+ this.abortable.abort("Failed to get stat of replication rs node", e);
+ throw e;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ea8833fd/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
index 0535b4b2..97763e2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
@@ -334,7 +334,9 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
// add delete op for peer
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
}
- // add delete op for dead rs
+ // add delete op for dead rs, this will update the cversion of the parent.
+ // The reader will make optimistic locking with this to get a consistent
+ // snapshot
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(deadRSZnodePath));
if (LOG.isTraceEnabled()) LOG.trace(" The multi list size is: " + listOfOps.size());
ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
http://git-wip-us.apache.org/repos/asf/hbase/blob/ea8833fd/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
index 525b7ad..474f497 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import java.io.IOException;
+import java.util.Collections;
import java.util.List;
import java.util.Set;
@@ -39,6 +40,7 @@ import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
+import org.apache.zookeeper.KeeperException;
/**
* Implementation of a log cleaner that checks if a log is still scheduled for
@@ -61,7 +63,15 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo
return files;
}
- final Set<String> wals = loadWALsFromQueues();
+ final Set<String> wals;
+ try {
+ // The concurrently created new WALs may not be included in the return list,
+ // but they won't be deleted because they're not in the checking set.
+ wals = loadWALsFromQueues();
+ } catch (KeeperException e) {
+ LOG.warn("Failed to read zookeeper, skipping checking deletable files");
+ return Collections.emptyList();
+ }
return Iterables.filter(files, new Predicate<FileStatus>() {
@Override
public boolean apply(FileStatus file) {
@@ -79,29 +89,40 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo
}
/**
- * Load all wals in all replication queues from ZK
+ * Load all wals in all replication queues from ZK. This method guarantees to return a
+ * snapshot which contains all WALs in the zookeeper at the start of this call even there
+ * is concurrent queue failover. However, some newly created WALs during the call may
+ * not be included.
*/
- private Set<String> loadWALsFromQueues() {
- List<String> rss = replicationQueues.getListOfReplicators();
- if (rss == null) {
- LOG.debug("Didn't find any region server that replicates, won't prevent any deletions.");
- return ImmutableSet.of();
- }
- Set<String> wals = Sets.newHashSet();
- for (String rs: rss) {
- List<String> listOfPeers = replicationQueues.getAllQueues(rs);
- // if rs just died, this will be null
- if (listOfPeers == null) {
- continue;
+ private Set<String> loadWALsFromQueues() throws KeeperException {
+ int v0 = replicationQueues.getQueuesZNodeCversion();
+ for (int retry = 0; ; retry++) {
+ List<String> rss = replicationQueues.getListOfReplicators();
+ if (rss == null) {
+ LOG.debug("Didn't find any region server that replicates, won't prevent any deletions.");
+ return ImmutableSet.of();
}
- for (String id : listOfPeers) {
- List<String> peersWals = replicationQueues.getLogsInQueue(rs, id);
- if (peersWals != null) {
- wals.addAll(peersWals);
+ Set<String> wals = Sets.newHashSet();
+ for (String rs : rss) {
+ List<String> listOfPeers = replicationQueues.getAllQueues(rs);
+ // if rs just died, this will be null
+ if (listOfPeers == null) {
+ continue;
}
+ for (String id : listOfPeers) {
+ List<String> peersWals = replicationQueues.getLogsInQueue(rs, id);
+ if (peersWals != null) {
+ wals.addAll(peersWals);
+ }
+ }
+ }
+ int v1 = replicationQueues.getQueuesZNodeCversion();
+ if (v0 == v1) {
+ return wals;
}
+ LOG.info(String.format("Replication queue node cversion changed from %d to %d, retry = %d",
+ v0, v1, retry));
}
- return wals;
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/ea8833fd/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
index e560620..f05eceb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
@@ -28,6 +28,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.zookeeper.KeeperException;
import org.junit.Before;
import org.junit.Test;
@@ -66,7 +67,7 @@ public abstract class TestReplicationStateBasic {
}
@Test
- public void testReplicationQueuesClient() throws ReplicationException {
+ public void testReplicationQueuesClient() throws ReplicationException, KeeperException {
rqc.init();
// Test methods with empty state
assertEquals(0, rqc.getListOfReplicators().size());
http://git-wip-us.apache.org/repos/asf/hbase/blob/ea8833fd/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 9315f62..571be26 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker;
@@ -368,6 +369,38 @@ public class TestReplicationSourceManager {
server.abort("", null);
}
+ @Test
+ public void testFailoverDeadServerCversionChange() throws Exception {
+ LOG.debug("testFailoverDeadServerCversionChange");
+
+ conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
+ final Server s0 = new DummyServer("cversion-change0.example.org");
+ ReplicationQueues repQueues =
+ ReplicationFactory.getReplicationQueues(s0.getZooKeeper(), conf, s0);
+ repQueues.init(s0.getServerName().toString());
+ // populate some znodes in the peer znode
+ files.add("log1");
+ files.add("log2");
+ for (String file : files) {
+ repQueues.addLog("1", file);
+ }
+ // simulate queue transfer
+ Server s1 = new DummyServer("cversion-change1.example.org");
+ ReplicationQueues rq1 =
+ ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1);
+ rq1.init(s1.getServerName().toString());
+
+ ReplicationQueuesClient client =
+ ReplicationFactory.getReplicationQueuesClient(s1.getZooKeeper(), s1.getConfiguration(), s1);
+
+ int v0 = client.getQueuesZNodeCversion();
+ rq1.claimQueues(s0.getServerName().getServerName());
+ int v1 = client.getQueuesZNodeCversion();
+ // cversion should increased by 1 since a child node is deleted
+ assertEquals(v0 + 1, v1);
+
+ s0.abort("", null);
+ }
static class DummyNodeFailoverWorker extends Thread {
private SortedMap<String, SortedSet<String>> logZnodesMap;