You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2014/07/15 01:46:27 UTC
git commit: HBASE-11442 ReplicationSourceManager doesn't cleanup the
queues for recovered sources (Virag Kothari)
Repository: hbase
Updated Branches:
refs/heads/master 463d52d8c -> 7db2563c6
HBASE-11442 ReplicationSourceManager doesn't cleanup the queues for recovered sources (Virag Kothari)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/7db2563c
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/7db2563c
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/7db2563c
Branch: refs/heads/master
Commit: 7db2563c6a16b4cc69a2343172e0ff0277f1f0c6
Parents: 463d52d
Author: Enis Soztutar <en...@apache.org>
Authored: Mon Jul 14 16:46:11 2014 -0700
Committer: Enis Soztutar <en...@apache.org>
Committed: Mon Jul 14 16:46:11 2014 -0700
----------------------------------------------------------------------
.../regionserver/ReplicationSourceManager.java | 51 ++++++++++++++------
.../replication/ReplicationSourceDummy.java | 4 +-
.../TestReplicationSourceManager.java | 46 ++++++++++++++++--
3 files changed, 83 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/7db2563c/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index db9c505..e196588 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -30,11 +30,12 @@ import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -86,6 +87,8 @@ public class ReplicationSourceManager implements ReplicationListener {
private final Stoppable stopper;
// All logs we are currently tracking
private final Map<String, SortedSet<String>> hlogsById;
+ // Logs for recovered sources we are currently tracking
+ private final Map<String, SortedSet<String>> hlogsByIdRecoveredQueues;
private final Configuration conf;
private final FileSystem fs;
// The path to the latest log we saw, for new coming sources
@@ -126,6 +129,7 @@ public class ReplicationSourceManager implements ReplicationListener {
this.replicationTracker = replicationTracker;
this.stopper = stopper;
this.hlogsById = new HashMap<String, SortedSet<String>>();
+ this.hlogsByIdRecoveredQueues = new ConcurrentHashMap<String, SortedSet<String>>();
this.oldsources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
this.conf = conf;
this.fs = fs;
@@ -177,20 +181,29 @@ public class ReplicationSourceManager implements ReplicationListener {
* @param id id of the peer cluster
* @param queueRecovered Whether this is a recovered queue
*/
- public void cleanOldLogs(String key,
- String id,
- boolean queueRecovered) {
- synchronized (this.hlogsById) {
- SortedSet<String> hlogs = this.hlogsById.get(id);
- if (queueRecovered || hlogs.first().equals(key)) {
- return;
+ public void cleanOldLogs(String key, String id, boolean queueRecovered) {
+ if (queueRecovered) {
+ SortedSet<String> hlogs = hlogsByIdRecoveredQueues.get(id);
+ if (hlogs != null && !hlogs.first().equals(key)) {
+ cleanOldLogs(hlogs, key, id);
}
- SortedSet<String> hlogSet = hlogs.headSet(key);
- for (String hlog : hlogSet) {
- this.replicationQueues.removeLog(id, hlog);
+ } else {
+ synchronized (this.hlogsById) {
+ SortedSet<String> hlogs = hlogsById.get(id);
+ if (!hlogs.first().equals(key)) {
+ cleanOldLogs(hlogs, key, id);
+ }
}
- hlogSet.clear();
}
+ }
+
+ private void cleanOldLogs(SortedSet<String> hlogs, String key, String id) {
+ SortedSet<String> hlogSet = hlogs.headSet(key);
+ LOG.debug("Removing " + hlogSet.size() + " logs in the list: " + hlogSet);
+ for (String hlog : hlogSet) {
+ this.replicationQueues.removeLog(id, hlog);
+ }
+ hlogSet.clear();
}
/**
@@ -285,6 +298,14 @@ public class ReplicationSourceManager implements ReplicationListener {
protected Map<String, SortedSet<String>> getHLogs() {
return Collections.unmodifiableMap(hlogsById);
}
+
+ /**
+ * Get a copy of the hlogs of the recovered sources on this rs
+ * @return a sorted set of hlog names
+ */
+ protected Map<String, SortedSet<String>> getHlogsByIdRecoveredQueues() {
+ return Collections.unmodifiableMap(hlogsByIdRecoveredQueues);
+ }
/**
* Get a list of all the normal sources of this rs
@@ -303,7 +324,6 @@ public class ReplicationSourceManager implements ReplicationListener {
}
void preLogRoll(Path newLog) throws IOException {
-
synchronized (this.hlogsById) {
String name = newLog.getName();
for (ReplicationSourceInterface source : this.sources) {
@@ -416,6 +436,7 @@ public class ReplicationSourceManager implements ReplicationListener {
LOG.info("Done with the recovered queue " + src.getPeerClusterZnode());
this.oldsources.remove(src);
deleteSource(src.getPeerClusterZnode(), false);
+ this.hlogsByIdRecoveredQueues.remove(src.getPeerClusterZnode());
}
/**
@@ -563,10 +584,12 @@ public class ReplicationSourceManager implements ReplicationListener {
break;
}
oldsources.add(src);
- for (String hlog : entry.getValue()) {
+ SortedSet<String> hlogsSet = entry.getValue();
+ for (String hlog : hlogsSet) {
src.enqueueLog(new Path(oldLogDir, hlog));
}
src.startup();
+ hlogsByIdRecoveredQueues.put(peerId, hlogsSet);
} catch (IOException e) {
// TODO manage it
LOG.error("Failed creating a source", e);
http://git-wip-us.apache.org/repos/asf/hbase/blob/7db2563c/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index 13a18ce..f463f76 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -80,7 +80,9 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
@Override
public String getPeerClusterId() {
- return peerClusterId;
+ String[] parts = peerClusterId.split("-", 2);
+ return parts.length != 1 ?
+ parts[0] : peerClusterId;
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/7db2563c/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 09fa096..99ad601 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -27,6 +27,8 @@ import java.util.Collection;
import java.util.List;
import java.util.SortedMap;
import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
@@ -55,10 +57,12 @@ import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
+import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
@@ -71,6 +75,8 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import com.google.common.collect.Sets;
+
@Category(MediumTests.class)
public class TestReplicationSourceManager {
@@ -138,14 +144,14 @@ public class TestReplicationSourceManager {
ZKUtil.setData(zkw, "/hbase/replication/state", ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
ZKClusterId.setClusterId(zkw, new ClusterId());
-
- replication = new Replication(new DummyServer(), fs, logDir, oldLogDir);
- manager = replication.getReplicationManager();
fs = FileSystem.get(conf);
oldLogDir = new Path(utility.getDataTestDir(),
HConstants.HREGION_OLDLOGDIR_NAME);
logDir = new Path(utility.getDataTestDir(),
HConstants.HREGION_LOGDIR_NAME);
+ replication = new Replication(new DummyServer(), fs, logDir, oldLogDir);
+ manager = replication.getReplicationManager();
+
logName = HConstants.HREGION_LOGDIR_NAME;
manager.addSource(slaveId);
@@ -274,6 +280,40 @@ public class TestReplicationSourceManager {
assertEquals(1, populatedMap);
server.abort("", null);
}
+
+ @Test
+ public void testCleanupFailoverQueues() throws Exception {
+ final Server server = new DummyServer("hostname1.example.org");
+ ReplicationQueues rq =
+ ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(),
+ server);
+ rq.init(server.getServerName().toString());
+ // populate some znodes in the peer znode
+ SortedSet<String> files = new TreeSet<String>();
+ files.add("log1");
+ files.add("log2");
+ for (String file : files) {
+ rq.addLog("1", file);
+ }
+ Server s1 = new DummyServer("dummyserver1.example.org");
+ ReplicationQueues rq1 =
+ ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1);
+ rq1.init(s1.getServerName().toString());
+ ReplicationPeers rp1 =
+ ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration(), s1);
+ rp1.init();
+ NodeFailoverWorker w1 =
+ manager.new NodeFailoverWorker(server.getServerName().getServerName(), rq1, rp1, new UUID(
+ new Long(1), new Long(2)));
+ w1.start();
+ w1.join(5000);
+ assertEquals(1, manager.getHlogsByIdRecoveredQueues().size());
+ String id = "1-" + server.getServerName().getServerName();
+ assertEquals(files, manager.getHlogsByIdRecoveredQueues().get(id));
+ manager.cleanOldLogs("log2", id, true);
+ // log1 should be deleted
+ assertEquals(Sets.newHashSet("log2"), manager.getHlogsByIdRecoveredQueues().get(id));
+ }
@Test
public void testNodeFailoverDeadServerParsing() throws Exception {