You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2014/07/15 01:46:27 UTC

git commit: HBASE-11442 ReplicationSourceManager doesn't cleanup the queues for recovered sources (Virag Kothari)

Repository: hbase
Updated Branches:
  refs/heads/master 463d52d8c -> 7db2563c6


HBASE-11442 ReplicationSourceManager doesn't cleanup the queues for recovered sources (Virag Kothari)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/7db2563c
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/7db2563c
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/7db2563c

Branch: refs/heads/master
Commit: 7db2563c6a16b4cc69a2343172e0ff0277f1f0c6
Parents: 463d52d
Author: Enis Soztutar <en...@apache.org>
Authored: Mon Jul 14 16:46:11 2014 -0700
Committer: Enis Soztutar <en...@apache.org>
Committed: Mon Jul 14 16:46:11 2014 -0700

----------------------------------------------------------------------
 .../regionserver/ReplicationSourceManager.java  | 51 ++++++++++++++------
 .../replication/ReplicationSourceDummy.java     |  4 +-
 .../TestReplicationSourceManager.java           | 46 ++++++++++++++++--
 3 files changed, 83 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/7db2563c/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index db9c505..e196588 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -30,11 +30,12 @@ import java.util.SortedMap;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -86,6 +87,8 @@ public class ReplicationSourceManager implements ReplicationListener {
   private final Stoppable stopper;
   // All logs we are currently tracking
   private final Map<String, SortedSet<String>> hlogsById;
+  // Logs for recovered sources we are currently tracking
+  private final Map<String, SortedSet<String>> hlogsByIdRecoveredQueues;
   private final Configuration conf;
   private final FileSystem fs;
   // The path to the latest log we saw, for new coming sources
@@ -126,6 +129,7 @@ public class ReplicationSourceManager implements ReplicationListener {
     this.replicationTracker = replicationTracker;
     this.stopper = stopper;
     this.hlogsById = new HashMap<String, SortedSet<String>>();
+    this.hlogsByIdRecoveredQueues = new ConcurrentHashMap<String, SortedSet<String>>();
     this.oldsources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
     this.conf = conf;
     this.fs = fs;
@@ -177,20 +181,29 @@ public class ReplicationSourceManager implements ReplicationListener {
    * @param id id of the peer cluster
    * @param queueRecovered Whether this is a recovered queue
    */
-  public void cleanOldLogs(String key,
-                           String id,
-                           boolean queueRecovered) {
-    synchronized (this.hlogsById) {
-      SortedSet<String> hlogs = this.hlogsById.get(id);
-      if (queueRecovered || hlogs.first().equals(key)) {
-        return;
+  public void cleanOldLogs(String key, String id, boolean queueRecovered) {
+    if (queueRecovered) {
+      SortedSet<String> hlogs = hlogsByIdRecoveredQueues.get(id);
+      if (hlogs != null && !hlogs.first().equals(key)) {
+        cleanOldLogs(hlogs, key, id);
       }
-      SortedSet<String> hlogSet = hlogs.headSet(key);
-      for (String hlog : hlogSet) {
-        this.replicationQueues.removeLog(id, hlog);
+    } else {
+      synchronized (this.hlogsById) {
+        SortedSet<String> hlogs = hlogsById.get(id);
+        if (!hlogs.first().equals(key)) {
+          cleanOldLogs(hlogs, key, id);
+        }
       }
-      hlogSet.clear();
     }
+ }
+  
+  private void cleanOldLogs(SortedSet<String> hlogs, String key, String id) {
+    SortedSet<String> hlogSet = hlogs.headSet(key);
+    LOG.debug("Removing " + hlogSet.size() + " logs in the list: " + hlogSet);
+    for (String hlog : hlogSet) {
+      this.replicationQueues.removeLog(id, hlog);
+    }
+    hlogSet.clear();
   }
 
   /**
@@ -285,6 +298,14 @@ public class ReplicationSourceManager implements ReplicationListener {
   protected Map<String, SortedSet<String>> getHLogs() {
     return Collections.unmodifiableMap(hlogsById);
   }
+  
+  /**
+   * Get a copy of the hlogs of the recovered sources on this rs
+   * @return a sorted set of hlog names
+   */
+  protected Map<String, SortedSet<String>> getHlogsByIdRecoveredQueues() {
+    return Collections.unmodifiableMap(hlogsByIdRecoveredQueues);
+  }
 
   /**
    * Get a list of all the normal sources of this rs
@@ -303,7 +324,6 @@ public class ReplicationSourceManager implements ReplicationListener {
   }
 
   void preLogRoll(Path newLog) throws IOException {
-
     synchronized (this.hlogsById) {
       String name = newLog.getName();
       for (ReplicationSourceInterface source : this.sources) {
@@ -416,6 +436,7 @@ public class ReplicationSourceManager implements ReplicationListener {
     LOG.info("Done with the recovered queue " + src.getPeerClusterZnode());
     this.oldsources.remove(src);
     deleteSource(src.getPeerClusterZnode(), false);
+    this.hlogsByIdRecoveredQueues.remove(src.getPeerClusterZnode());
   }
 
   /**
@@ -563,10 +584,12 @@ public class ReplicationSourceManager implements ReplicationListener {
             break;
           }
           oldsources.add(src);
-          for (String hlog : entry.getValue()) {
+          SortedSet<String> hlogsSet = entry.getValue();
+          for (String hlog : hlogsSet) {
             src.enqueueLog(new Path(oldLogDir, hlog));
           }
           src.startup();
+          hlogsByIdRecoveredQueues.put(peerId, hlogsSet);
         } catch (IOException e) {
           // TODO manage it
           LOG.error("Failed creating a source", e);

http://git-wip-us.apache.org/repos/asf/hbase/blob/7db2563c/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index 13a18ce..f463f76 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -80,7 +80,9 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
 
   @Override
   public String getPeerClusterId() {
-    return peerClusterId;
+    String[] parts = peerClusterId.split("-", 2);
+    return parts.length != 1 ?
+        parts[0] : peerClusterId;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/7db2563c/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 09fa096..99ad601 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -27,6 +27,8 @@ import java.util.Collection;
 import java.util.List;
 import java.util.SortedMap;
 import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -55,10 +57,12 @@ import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.replication.ReplicationFactory;
+import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
 import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
 import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
@@ -71,6 +75,8 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import com.google.common.collect.Sets;
+
 @Category(MediumTests.class)
 public class TestReplicationSourceManager {
 
@@ -138,14 +144,14 @@ public class TestReplicationSourceManager {
     ZKUtil.setData(zkw, "/hbase/replication/state", ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
 
     ZKClusterId.setClusterId(zkw, new ClusterId());
-
-    replication = new Replication(new DummyServer(), fs, logDir, oldLogDir);
-    manager = replication.getReplicationManager();
     fs = FileSystem.get(conf);
     oldLogDir = new Path(utility.getDataTestDir(),
         HConstants.HREGION_OLDLOGDIR_NAME);
     logDir = new Path(utility.getDataTestDir(),
         HConstants.HREGION_LOGDIR_NAME);
+    replication = new Replication(new DummyServer(), fs, logDir, oldLogDir);
+    manager = replication.getReplicationManager();
+    
     logName = HConstants.HREGION_LOGDIR_NAME;
 
     manager.addSource(slaveId);
@@ -274,6 +280,40 @@ public class TestReplicationSourceManager {
     assertEquals(1, populatedMap);
     server.abort("", null);
   }
+  
+  @Test
+  public void testCleanupFailoverQueues() throws Exception {
+    final Server server = new DummyServer("hostname1.example.org");
+    ReplicationQueues rq =
+        ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(),
+          server);
+    rq.init(server.getServerName().toString());
+    // populate some znodes in the peer znode
+    SortedSet<String> files = new TreeSet<String>();
+    files.add("log1");
+    files.add("log2");
+    for (String file : files) {
+      rq.addLog("1", file);
+    }
+    Server s1 = new DummyServer("dummyserver1.example.org");
+    ReplicationQueues rq1 =
+        ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1);
+    rq1.init(s1.getServerName().toString());
+    ReplicationPeers rp1 =
+        ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration(), s1);
+    rp1.init();
+    NodeFailoverWorker w1 =
+        manager.new NodeFailoverWorker(server.getServerName().getServerName(), rq1, rp1, new UUID(
+            new Long(1), new Long(2)));
+    w1.start();
+    w1.join(5000);
+    assertEquals(1, manager.getHlogsByIdRecoveredQueues().size());
+    String id = "1-" + server.getServerName().getServerName();
+    assertEquals(files, manager.getHlogsByIdRecoveredQueues().get(id));
+    manager.cleanOldLogs("log2", id, true);
+    // log1 should be deleted
+    assertEquals(Sets.newHashSet("log2"), manager.getHlogsByIdRecoveredQueues().get(id));
+  }
 
   @Test
   public void testNodeFailoverDeadServerParsing() throws Exception {