You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by la...@apache.org on 2013/07/18 23:10:33 UTC
svn commit: r1504664 - in
/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver:
ReplicationSource.java ReplicationSourceManager.java
Author: larsh
Date: Thu Jul 18 21:10:33 2013
New Revision: 1504664
URL: http://svn.apache.org/r1504664
Log:
HBASE-8599 HLogs in ZK are not cleaned up when replication lag is minimal (Varun Sharma)
Modified:
hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1504664&r1=1504663&r2=1504664&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Thu Jul 18 21:10:33 2013
@@ -282,6 +282,10 @@ public class ReplicationSource extends T
sleepMultiplier++;
}
continue;
+ } else if (oldPath != null && !oldPath.getName().equals(getCurrentPath().getName())) {
+ this.manager.cleanOldLogs(getCurrentPath().getName(),
+ this.peerId,
+ this.replicationQueueInfo.isQueueRecovered());
}
boolean currentWALisBeingWrittenTo = false;
//For WAL files we own (rather than recovered), take a snapshot of whether the
Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java?rev=1504664&r1=1504663&r2=1504664&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java Thu Jul 18 21:10:33 2013
@@ -155,15 +155,29 @@ public class ReplicationSourceManager {
if (holdLogInZK) {
return;
}
+ cleanOldLogs(key, id, queueRecovered);
+ }
+
+ /**
+ * Cleans a log file and all older files from ZK. Called when we are sure that a
+ * log file is closed and has no more entries.
+ * @param key Path to the log
+ * @param id id of the peer cluster
+ * @param queueRecovered Whether this is a recovered queue
+ */
+ public void cleanOldLogs(String key,
+ String id,
+ boolean queueRecovered) {
synchronized (this.hlogsById) {
SortedSet<String> hlogs = this.hlogsById.get(id);
- if (!queueRecovered && !hlogs.first().equals(key)) {
- SortedSet<String> hlogSet = hlogs.headSet(key);
- for (String hlog : hlogSet) {
- this.zkHelper.removeLogFromList(hlog, id);
- }
- hlogSet.clear();
+ if (queueRecovered || hlogs.first().equals(key)) {
+ return;
+ }
+ SortedSet<String> hlogSet = hlogs.headSet(key);
+ for (String hlog : hlogSet) {
+ this.zkHelper.removeLogFromList(hlog, id);
}
+ hlogSet.clear();
}
}