You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by la...@apache.org on 2013/07/18 23:10:33 UTC

svn commit: r1504664 - in /hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver: ReplicationSource.java ReplicationSourceManager.java

Author: larsh
Date: Thu Jul 18 21:10:33 2013
New Revision: 1504664

URL: http://svn.apache.org/r1504664
Log:
HBASE-8599 HLogs in ZK are not cleaned up when replication lag is minimal (Varun Sharma)

Modified:
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1504664&r1=1504663&r2=1504664&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Thu Jul 18 21:10:33 2013
@@ -282,6 +282,10 @@ public class ReplicationSource extends T
           sleepMultiplier++;
         }
         continue;
+      } else if (oldPath != null && !oldPath.getName().equals(getCurrentPath().getName())) {
+        this.manager.cleanOldLogs(getCurrentPath().getName(),
+                                  this.peerId,
+                                  this.replicationQueueInfo.isQueueRecovered());
       }
       boolean currentWALisBeingWrittenTo = false;
       //For WAL files we own (rather than recovered), take a snapshot of whether the

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java?rev=1504664&r1=1504663&r2=1504664&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java Thu Jul 18 21:10:33 2013
@@ -155,15 +155,29 @@ public class ReplicationSourceManager {
     if (holdLogInZK) {
      return;
     }
+    cleanOldLogs(key, id, queueRecovered);
+  }
+
+  /**
+   * Cleans a log file and all older files from ZK. Called when we are sure that a
+   * log file is closed and has no more entries.
+   * @param key Path to the log
+   * @param id id of the peer cluster
+   * @param queueRecovered Whether this is a recovered queue
+   */
+  public void cleanOldLogs(String key,
+                           String id,
+                           boolean queueRecovered) {
     synchronized (this.hlogsById) {
       SortedSet<String> hlogs = this.hlogsById.get(id);
-      if (!queueRecovered && !hlogs.first().equals(key)) {
-        SortedSet<String> hlogSet = hlogs.headSet(key);
-        for (String hlog : hlogSet) {
-          this.zkHelper.removeLogFromList(hlog, id);
-        }
-        hlogSet.clear();
+      if (queueRecovered || hlogs.first().equals(key)) {
+        return;
+      }
+      SortedSet<String> hlogSet = hlogs.headSet(key);
+      for (String hlog : hlogSet) {
+        this.zkHelper.removeLogFromList(hlog, id);
       }
+      hlogSet.clear();
     }
   }