You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2017/01/04 07:39:34 UTC

[32/50] [abbrv] hbase git commit: HBASE-16524 Procedure v2 - Compute WALs cleanup on wal modification and not on every sync (Matteo Bertozzi)

HBASE-16524 Procedure v2 - Compute WALs cleanup on wal modification and not on every sync (Matteo Bertozzi)

Signed-off-by: Michael Stack <st...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/319ecd86
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/319ecd86
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/319ecd86

Branch: refs/heads/hbase-12439
Commit: 319ecd867a2903c4ce03c38f6ffec62ada1a6049
Parents: ccb8d67
Author: Michael Stack <st...@apache.org>
Authored: Tue Dec 27 13:53:43 2016 -0800
Committer: Michael Stack <st...@apache.org>
Committed: Tue Dec 27 16:12:45 2016 -0800

----------------------------------------------------------------------
 .../procedure2/store/ProcedureStoreTracker.java | 195 +++++++++----------
 .../store/wal/ProcedureWALFormatReader.java     |  24 ++-
 .../procedure2/store/wal/WALProcedureStore.java | 146 +++++++++-----
 .../store/TestProcedureStoreTracker.java        | 109 +++--------
 .../store/wal/TestWALProcedureStore.java        | 117 ++++++++---
 5 files changed, 322 insertions(+), 269 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/319ecd86/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
index 7ba72f2..0899767 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
@@ -156,11 +156,18 @@ public class ProcedureStoreTracker {
       partial = false;
     }
 
-    public BitSetNode(BitSetNode other) {
+    public BitSetNode(final BitSetNode other, final boolean resetDelete) {
       this.start = other.start;
       this.partial = other.partial;
       this.updated = other.updated.clone();
-      this.deleted = other.deleted.clone();
+      if (resetDelete) {
+        this.deleted = new long[other.deleted.length];
+        for (int i = 0; i < this.deleted.length; ++i) {
+          this.deleted[i] = ~(other.updated[i]);
+        }
+      } else {
+        this.deleted = other.deleted.clone();
+      }
     }
 
     public void update(final long procId) {
@@ -171,11 +178,11 @@ public class ProcedureStoreTracker {
       updateState(procId, true);
     }
 
-    public Long getStart() {
+    public long getStart() {
       return start;
     }
 
-    public Long getEnd() {
+    public long getEnd() {
       return start + (updated.length << ADDRESS_BITS_PER_WORD) - 1;
     }
 
@@ -250,33 +257,6 @@ public class ProcedureStoreTracker {
     }
 
     /**
-     * If an active (non-deleted) procedure in current BitSetNode has been updated in {@code other}
-     * BitSetNode, then delete it from current node.
-     * @return true if node changed, i.e. some procedure(s) from {@code other} was subtracted from
-     * current node.
-     */
-    public boolean subtract(BitSetNode other) {
-      // Assert that other node intersects with this node.
-      assert !(other.getEnd() < this.start) && !(this.getEnd() < other.start);
-      int thisOffset = 0, otherOffset = 0;
-      if (this.start < other.start) {
-        thisOffset = (int) (other.start - this.start) / BITS_PER_WORD;
-      } else {
-        otherOffset = (int) (this.start - other.start) / BITS_PER_WORD;
-      }
-      int size = Math.min(this.updated.length - thisOffset, other.updated.length - otherOffset);
-      boolean nonZeroIntersect = false;
-      for (int i = 0; i < size; i++) {
-        long intersect = ~this.deleted[thisOffset + i] & other.updated[otherOffset + i];
-        if (intersect != 0) {
-          this.deleted[thisOffset + i] |= intersect;
-          nonZeroIntersect = true;
-        }
-      }
-      return nonZeroIntersect;
-    }
-
-    /**
      * Convert to
      * org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureStoreTracker.TrackerNode
      * protobuf.
@@ -292,7 +272,6 @@ public class ProcedureStoreTracker {
       return builder.build();
     }
 
-
     // ========================================================================
     //  Grow/Merge Helpers
     // ========================================================================
@@ -461,20 +440,22 @@ public class ProcedureStoreTracker {
   /**
    * Resets internal state to same as given {@code tracker}. Does deep copy of the bitmap.
    */
-  public void resetTo(ProcedureStoreTracker tracker) {
+  public void resetTo(final ProcedureStoreTracker tracker) {
+    resetTo(tracker, false);
+  }
+
+  public void resetTo(final ProcedureStoreTracker tracker, final boolean resetDelete) {
     this.partial = tracker.partial;
     this.minUpdatedProcId = tracker.minUpdatedProcId;
     this.maxUpdatedProcId = tracker.maxUpdatedProcId;
     this.keepDeletes = tracker.keepDeletes;
     for (Map.Entry<Long, BitSetNode> entry : tracker.map.entrySet()) {
-      map.put(entry.getKey(), new BitSetNode(entry.getValue()));
+      map.put(entry.getKey(), new BitSetNode(entry.getValue(), resetDelete));
     }
   }
 
   public void insert(long procId) {
-    BitSetNode node = getOrCreateNode(procId);
-    node.update(procId);
-    trackProcIds(procId);
+    insert(null, procId);
   }
 
   public void insert(final long[] procIds) {
@@ -484,46 +465,108 @@ public class ProcedureStoreTracker {
   }
 
   public void insert(final long procId, final long[] subProcIds) {
-    update(procId);
+    BitSetNode node = null;
+    node = update(node, procId);
     for (int i = 0; i < subProcIds.length; ++i) {
-      insert(subProcIds[i]);
+      node = insert(node, subProcIds[i]);
     }
   }
 
+  private BitSetNode insert(BitSetNode node, final long procId) {
+    if (node == null || !node.contains(procId)) {
+      node = getOrCreateNode(procId);
+    }
+    node.update(procId);
+    trackProcIds(procId);
+    return node;
+  }
+
   public void update(long procId) {
-    Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId);
-    assert entry != null : "expected node to update procId=" + procId;
+    update(null, procId);
+  }
 
-    BitSetNode node = entry.getValue();
-    assert node.contains(procId);
+  private BitSetNode update(BitSetNode node, final long procId) {
+    node = lookupClosestNode(node, procId);
+    assert node != null : "expected node to update procId=" + procId;
+    assert node.contains(procId) : "expected procId=" + procId + " in the node";
     node.update(procId);
     trackProcIds(procId);
+    return node;
   }
 
   public void delete(long procId) {
-    Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId);
-    assert entry != null : "expected node to delete procId=" + procId;
+    delete(null, procId);
+  }
 
-    BitSetNode node = entry.getValue();
-    assert node.contains(procId) : "expected procId in the node";
-    node.delete(procId);
+  public void delete(final long[] procIds) {
+    Arrays.sort(procIds);
+    BitSetNode node = null;
+    for (int i = 0; i < procIds.length; ++i) {
+      node = delete(node, procIds[i]);
+    }
+  }
 
+  private BitSetNode delete(BitSetNode node, final long procId) {
+    node = lookupClosestNode(node, procId);
+    assert node != null : "expected node to delete procId=" + procId;
+    assert node.contains(procId) : "expected procId=" + procId + " in the node";
+    node.delete(procId);
     if (!keepDeletes && node.isEmpty()) {
       // TODO: RESET if (map.size() == 1)
-      map.remove(entry.getKey());
+      map.remove(node.getStart());
     }
 
     trackProcIds(procId);
+    return node;
   }
 
-  public void delete(long[] procIds) {
-    // TODO: optimize
-    Arrays.sort(procIds);
-    for (int i = 0; i < procIds.length; ++i) {
-      delete(procIds[i]);
+  @InterfaceAudience.Private
+  public void setDeleted(final long procId, final boolean isDeleted) {
+    BitSetNode node = getOrCreateNode(procId);
+    assert node.contains(procId) : "expected procId=" + procId + " in the node=" + node;
+    node.updateState(procId, isDeleted);
+    trackProcIds(procId);
+  }
+
+  public void setDeletedIfSet(final long... procId) {
+    BitSetNode node = null;
+    for (int i = 0; i < procId.length; ++i) {
+      node = lookupClosestNode(node, procId[i]);
+      if (node != null && node.isUpdated(procId[i])) {
+        node.delete(procId[i]);
+      }
     }
   }
 
+  public void setDeletedIfSet(final ProcedureStoreTracker tracker) {
+    BitSetNode trackerNode = null;
+    for (BitSetNode node: map.values()) {
+      final long minProcId = node.getStart();
+      final long maxProcId = node.getEnd();
+      for (long procId = minProcId; procId <= maxProcId; ++procId) {
+        if (!node.isUpdated(procId)) continue;
+
+        trackerNode = tracker.lookupClosestNode(trackerNode, procId);
+        if (trackerNode == null || !trackerNode.contains(procId) || trackerNode.isUpdated(procId)) {
+          // the procedure was removed or updated
+          node.delete(procId);
+        }
+      }
+    }
+  }
+
+  /**
+   * lookup the node containing the specified procId.
+   * @param node cached node to check before doing a lookup
+   * @param procId the procId to lookup
+   * @return the node that may contains the procId or null
+   */
+  private BitSetNode lookupClosestNode(final BitSetNode node, final long procId) {
+    if (node != null && node.contains(procId)) return node;
+    final Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId);
+    return entry != null ? entry.getValue() : null;
+  }
+
   private void trackProcIds(long procId) {
     minUpdatedProcId = Math.min(minUpdatedProcId, procId);
     maxUpdatedProcId = Math.max(maxUpdatedProcId, procId);
@@ -537,14 +580,6 @@ public class ProcedureStoreTracker {
     return maxUpdatedProcId;
   }
 
-  @InterfaceAudience.Private
-  public void setDeleted(final long procId, final boolean isDeleted) {
-    BitSetNode node = getOrCreateNode(procId);
-    assert node.contains(procId) : "expected procId=" + procId + " in the node=" + node;
-    node.updateState(procId, isDeleted);
-    trackProcIds(procId);
-  }
-
   public void reset() {
     this.keepDeletes = false;
     this.partial = false;
@@ -632,11 +667,6 @@ public class ProcedureStoreTracker {
     return true;
   }
 
-  public boolean isTracking(long minId, long maxId) {
-    // TODO: we can make it more precise, instead of looking just at the block
-    return map.floorEntry(minId) != null || map.floorEntry(maxId) != null;
-  }
-
   /**
    * Clears the list of updated procedure ids. This doesn't affect global list of active
    * procedure ids.
@@ -737,37 +767,6 @@ public class ProcedureStoreTracker {
     }
   }
 
-  /**
-   * Iterates over
-   * {@link BitSetNode}s in this.map and subtracts with corresponding ones from {@code other}
-   * tracker.
-   * @return true if tracker changed, i.e. some procedure from {@code other} were subtracted from
-   * current tracker.
-   */
-  public boolean subtract(ProcedureStoreTracker other) {
-    // Can not intersect partial bitmap.
-    assert !partial && !other.partial;
-    boolean nonZeroIntersect = false;
-    for (Map.Entry<Long, BitSetNode> currentEntry : map.entrySet()) {
-      BitSetNode currentBitSetNode = currentEntry.getValue();
-      Map.Entry<Long, BitSetNode> otherTrackerEntry = other.map.floorEntry(currentEntry.getKey());
-      if (otherTrackerEntry == null  // No node in other map with key <= currentEntry.getKey().
-          // First entry in other map doesn't intersect with currentEntry.
-          || otherTrackerEntry.getValue().getEnd() < currentEntry.getKey()) {
-        otherTrackerEntry = other.map.ceilingEntry(currentEntry.getKey());
-        if (otherTrackerEntry == null || !currentBitSetNode.contains(otherTrackerEntry.getKey())) {
-          // No node in other map intersects with currentBitSetNode's range.
-          continue;
-        }
-      }
-      do {
-        nonZeroIntersect |= currentEntry.getValue().subtract(otherTrackerEntry.getValue());
-        otherTrackerEntry = other.map.higherEntry(otherTrackerEntry.getKey());
-      } while (otherTrackerEntry != null && currentBitSetNode.contains(otherTrackerEntry.getKey()));
-    }
-    return nonZeroIntersect;
-  }
-
   // ========================================================================
   //  Convert to/from Protocol Buffer.
   // ========================================================================

http://git-wip-us.apache.org/repos/asf/hbase/blob/319ecd86/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
index e5c8fca..aeae569 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
@@ -101,8 +101,18 @@ public class ProcedureWALFormatReader {
   private final WalProcedureMap localProcedureMap = new WalProcedureMap(1024);
   private final WalProcedureMap procedureMap = new WalProcedureMap(1024);
 
-  // private long compactionLogId;
-  private long maxProcId = 0;
+  private final ProcedureWALFormat.Loader loader;
+
+  /**
+   * Global tracker that will be used by the WALProcedureStore after load.
+   * If the last WAL was closed cleanly we already have a full tracker ready to be used.
+   * If the last WAL was truncated (e.g. master killed) the tracker will be empty
+   * and the 'partial' flag will be set. In this case on WAL replay we are going
+   * to rebuild the tracker.
+   */
+  private final ProcedureStoreTracker tracker;
+  // private final boolean hasFastStartSupport;
+
   /**
    * If tracker for a log file is partial (see {@link ProcedureStoreTracker#partial}), we
    * re-build the list of procedures updated in that WAL because we need it for log cleaning
@@ -113,13 +123,9 @@ public class ProcedureWALFormatReader {
    * {@link ProcedureStoreTracker.BitSetNode#subtract(ProcedureStoreTracker.BitSetNode)}).
    */
   private ProcedureStoreTracker localTracker;
-  private final ProcedureWALFormat.Loader loader;
-  /**
-   * Global tracker. If set to partial, it will be updated as procedures are loaded from wals,
-   * otherwise not.
-   */
-  private final ProcedureStoreTracker tracker;
-  // private final boolean hasFastStartSupport;
+
+  // private long compactionLogId;
+  private long maxProcId = 0;
 
   public ProcedureWALFormatReader(final ProcedureStoreTracker tracker,
       ProcedureWALFormat.Loader loader) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/319ecd86/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
index 3884e39..922b681 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
@@ -72,6 +72,14 @@ public class WALProcedureStore extends ProcedureStoreBase {
     void recoverFileLease(FileSystem fs, Path path) throws IOException;
   }
 
+  public static final String WAL_COUNT_WARN_THRESHOLD_CONF_KEY =
+    "hbase.procedure.store.wal.warn.threshold";
+  private static final int DEFAULT_WAL_COUNT_WARN_THRESHOLD = 64;
+
+  public static final String EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY =
+    "hbase.procedure.store.wal.exec.cleanup.on.load";
+  private static final boolean DEFAULT_EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY = true;
+
   public static final String MAX_RETRIES_BEFORE_ROLL_CONF_KEY =
     "hbase.procedure.store.wal.max.retries.before.roll";
   private static final int DEFAULT_MAX_RETRIES_BEFORE_ROLL = 3;
@@ -106,6 +114,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
   private static final int DEFAULT_SYNC_STATS_COUNT = 10;
 
   private final LinkedList<ProcedureWALFile> logs = new LinkedList<>();
+  private final ProcedureStoreTracker holdingCleanupTracker = new ProcedureStoreTracker();
   private final ProcedureStoreTracker storeTracker = new ProcedureStoreTracker();
   private final ReentrantLock lock = new ReentrantLock();
   private final Condition waitCond = lock.newCondition();
@@ -132,6 +141,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
   private Thread syncThread;
   private ByteSlot[] slots;
 
+  private int walCountWarnThreshold;
   private int maxRetriesBeforeRoll;
   private int maxSyncFailureRoll;
   private int waitBeforeRoll;
@@ -195,6 +205,8 @@ public class WALProcedureStore extends ProcedureStoreBase {
     }
 
     // Tunings
+    walCountWarnThreshold =
+      conf.getInt(WAL_COUNT_WARN_THRESHOLD_CONF_KEY, DEFAULT_WAL_COUNT_WARN_THRESHOLD);
     maxRetriesBeforeRoll =
       conf.getInt(MAX_RETRIES_BEFORE_ROLL_CONF_KEY, DEFAULT_MAX_RETRIES_BEFORE_ROLL);
     maxSyncFailureRoll = conf.getInt(MAX_SYNC_FAILURE_ROLL_CONF_KEY, DEFAULT_MAX_SYNC_FAILURE_ROLL);
@@ -257,6 +269,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
       log.close();
     }
     logs.clear();
+    loading.set(true);
   }
 
   private void sendStopSignal() {
@@ -335,24 +348,25 @@ public class WALProcedureStore extends ProcedureStoreBase {
 
   @Override
   public void load(final ProcedureLoader loader) throws IOException {
-    if (logs.isEmpty()) {
-      throw new RuntimeException("recoverLease() must be called before loading data");
-    }
+    lock.lock();
+    try {
+      if (logs.isEmpty()) {
+        throw new RuntimeException("recoverLease() must be called before loading data");
+      }
 
-    // Nothing to do, If we have only the current log.
-    if (logs.size() == 1) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("No state logs to replay.");
+      // Nothing to do, If we have only the current log.
+      if (logs.size() == 1) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("No state logs to replay.");
+        }
+        loader.setMaxProcId(0);
+        return;
       }
-      loader.setMaxProcId(0);
-      loading.set(false);
-      return;
-    }
 
-    // Load the old logs
-    Iterator<ProcedureWALFile> it = logs.descendingIterator();
-    it.next(); // Skip the current log
-    try {
+      // Load the old logs
+      final Iterator<ProcedureWALFile> it = logs.descendingIterator();
+      it.next(); // Skip the current log
+
       ProcedureWALFormat.load(it, storeTracker, new ProcedureWALFormat.Loader() {
         @Override
         public void setMaxProcId(long maxProcId) {
@@ -379,7 +393,32 @@ public class WALProcedureStore extends ProcedureStoreBase {
         }
       });
     } finally {
-      loading.set(false);
+      try {
+        // try to cleanup inactive wals and complete the operation
+        buildHoldingCleanupTracker();
+        tryCleanupLogsOnLoad();
+        loading.set(false);
+      } finally {
+        lock.unlock();
+      }
+    }
+  }
+
+  private void tryCleanupLogsOnLoad() {
+    // nothing to cleanup.
+    if (logs.size() <= 1) return;
+
+    // the config says to not cleanup wals on load.
+    if (!conf.getBoolean(EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY,
+      DEFAULT_EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY)) {
+      LOG.debug("WALs cleanup on load is not enabled: " + getActiveLogs());
+      return;
+    }
+
+    try {
+      periodicRoll();
+    } catch (IOException e) {
+      LOG.warn("unable to cleanup logs on load: " + e.getMessage(), e);
     }
   }
 
@@ -634,16 +673,20 @@ public class WALProcedureStore extends ProcedureStoreBase {
           storeTracker.insert(subProcIds);
         } else {
           storeTracker.insert(procId, subProcIds);
+          holdingCleanupTracker.setDeletedIfSet(procId);
         }
         break;
       case UPDATE:
         storeTracker.update(procId);
+        holdingCleanupTracker.setDeletedIfSet(procId);
         break;
       case DELETE:
         if (subProcIds != null && subProcIds.length > 0) {
           storeTracker.delete(subProcIds);
+          holdingCleanupTracker.setDeletedIfSet(subProcIds);
         } else {
           storeTracker.delete(procId);
+          holdingCleanupTracker.setDeletedIfSet(procId);
         }
         break;
       default:
@@ -948,6 +991,15 @@ public class WALProcedureStore extends ProcedureStoreBase {
     lastRollTs.set(rollTs);
     logs.add(new ProcedureWALFile(fs, newLogFile, header, startPos, rollTs));
 
+    // if it's the first next WAL being added, build the holding cleanup tracker
+    if (logs.size() == 2) {
+      buildHoldingCleanupTracker();
+    } else if (logs.size() > walCountWarnThreshold) {
+      LOG.warn("procedure WALs count=" + logs.size() +
+        " above the warning threshold " + walCountWarnThreshold +
+        ". check running procedures to see if something is stuck.");
+    }
+
     if (LOG.isDebugEnabled()) {
       LOG.debug("Roll new state log: " + logId);
     }
@@ -976,38 +1028,33 @@ public class WALProcedureStore extends ProcedureStoreBase {
   // ==========================================================================
   //  Log Files cleaner helpers
   // ==========================================================================
-
-  /**
-   * Iterates over log files from latest (ignoring currently active one) to oldest, deleting the
-   * ones which don't contain anything useful for recovery.
-   * @throws IOException
-   */
   private void removeInactiveLogs() throws IOException {
-    // TODO: can we somehow avoid first iteration (starting from newest log) and still figure out
-    // efficient way to cleanup old logs.
-    // Alternatively, a complex and maybe more efficient method would be using this iteration to
-    // rewrite latest states of active procedures to a new log file and delete all old ones.
-    if (logs.size() <= 1) return;
-    ProcedureStoreTracker runningTracker = new ProcedureStoreTracker();
-    runningTracker.resetTo(storeTracker);
-    List<ProcedureWALFile> logsToBeDeleted = new ArrayList<>();
-    for (int i = logs.size() - 2; i >= 0; i--) {
-      ProcedureWALFile log = logs.get(i);
-      // If nothing was subtracted, delete the log file since it doesn't contain any useful proc
-      // states.
-      if (!runningTracker.subtract(log.getTracker())) {
-        logsToBeDeleted.add(log);
-      }
+    // We keep track of which procedures are holding the oldest WAL in 'holdingCleanupTracker'.
+    // once there is nothing olding the oldest WAL we can remove it.
+    while (logs.size() > 1 && holdingCleanupTracker.isEmpty()) {
+      removeLogFile(logs.getFirst());
+      buildHoldingCleanupTracker();
     }
-    // Delete the logs from oldest to newest and stop at first log that can't be deleted to avoid
-    // holes in the log file sequence (for better debug capability).
-    while (true) {
-      ProcedureWALFile log = logs.getFirst();
-      if (logsToBeDeleted.contains(log)) {
-        removeLogFile(log);
-      } else {
-        break;
-      }
+
+    // TODO: In case we are holding up a lot of logs for long time we should
+    // rewrite old procedures (in theory parent procs) to the new WAL.
+  }
+
+  private void buildHoldingCleanupTracker() {
+    if (logs.size() <= 1) {
+      // we only have one wal, so nothing to do
+      holdingCleanupTracker.reset();
+      return;
+    }
+
+    // compute the holding tracker.
+    //  - the first WAL is used for the 'updates'
+    //  - the other WALs are scanned to remove procs already in other wals.
+    // TODO: exit early if holdingCleanupTracker.isEmpty()
+    holdingCleanupTracker.resetTo(logs.getFirst().getTracker(), true);
+    holdingCleanupTracker.setDeletedIfSet(storeTracker);
+    for (int i = 1, size = logs.size() - 1; i < size; ++i) {
+      holdingCleanupTracker.setDeletedIfSet(logs.get(i).getTracker());
     }
   }
 
@@ -1020,12 +1067,19 @@ public class WALProcedureStore extends ProcedureStoreBase {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Remove all state logs with ID less than " + lastLogId);
     }
+
+    boolean removed = false;
     while (logs.size() > 1) {
       ProcedureWALFile log = logs.getFirst();
       if (lastLogId < log.getLogId()) {
         break;
       }
       removeLogFile(log);
+      removed = true;
+    }
+
+    if (removed) {
+      buildHoldingCleanupTracker();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/319ecd86/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java
index 76fd2c5..550116e 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java
@@ -106,32 +106,6 @@ public class TestProcedureStoreTracker {
   }
 
   @Test
-  public void testIsTracking() {
-    long[][] procIds = new long[][] {{4, 7}, {1024, 1027}, {8192, 8194}};
-    long[][] checkIds = new long[][] {{2, 8}, {1023, 1025}, {8193, 8191}};
-
-    ProcedureStoreTracker tracker = new ProcedureStoreTracker();
-    for (int i = 0; i < procIds.length; ++i) {
-      long[] seq = procIds[i];
-      tracker.insert(seq[0]);
-      tracker.insert(seq[1]);
-    }
-
-    for (int i = 0; i < procIds.length; ++i) {
-      long[] check = checkIds[i];
-      long[] seq = procIds[i];
-      assertTrue(tracker.isTracking(seq[0], seq[1]));
-      assertTrue(tracker.isTracking(check[0], check[1]));
-      tracker.delete(seq[0]);
-      tracker.delete(seq[1]);
-      assertFalse(tracker.isTracking(seq[0], seq[1]));
-      assertFalse(tracker.isTracking(check[0], check[1]));
-    }
-
-    assertTrue(tracker.isEmpty());
-  }
-
-  @Test
   public void testBasicCRUD() {
     ProcedureStoreTracker tracker = new ProcedureStoreTracker();
     assertTrue(tracker.isEmpty());
@@ -287,64 +261,31 @@ public class TestProcedureStoreTracker {
   }
 
   @Test
-  public void testBitSetNodeSubtract() {
-    // 1 not updated in n2, nothing to subtract
-    BitSetNode n1 = buildBitSetNode(new long[]{ 1L }, new long[]{ 1L }, new long[]{ });
-    BitSetNode n2 = buildBitSetNode(new long[]{ 1L }, new long[]{}, new long[]{});
-    assertFalse(n1.subtract(n2));
-
-    // 1 updated in n2, and not deleted in n1, should subtract.
-    n1 = buildBitSetNode(new long[]{ 1L }, new long[]{ 1L }, new long[]{});
-    n2 = buildBitSetNode(new long[]{ 1L }, new long[]{ 1L }, new long[]{});
-    assertTrue(n1.subtract(n2));
-
-    // 1 updated in n2, but deleted in n1, should not subtract
-    n1 = buildBitSetNode(new long[]{ 1L }, new long[]{ 1L }, new long[]{ 1L });
-    n2 = buildBitSetNode(new long[]{ 1L }, new long[]{ 1L }, new long[]{});
-    assertFalse(n1.subtract(n2));
-
-    // 1 updated in n2, but not deleted in n1, should subtract.
-    n1 = buildBitSetNode(new long[]{ 1L }, new long[]{ 1L }, new long[]{});
-    n2 = buildBitSetNode(new long[]{ 1L }, new long[]{ 1L }, new long[]{ 1L });
-    assertTrue(n1.subtract(n2));
-
-    // all four cases together.
-    n1 = buildBitSetNode(new long[]{ 0L, 10L, 20L, 30L  }, new long[]{ 0L, 10L, 20L, 30L  },
-        new long[]{ 20L });
-    n2 = buildBitSetNode(new long[]{ 0L, 10L, 20L, 30L  }, new long[]{ 0L, 20L, 30L },
-        new long[]{ 0L });
-    assertTrue(n1.subtract(n2));
-  }
+  public void testSetDeletedIfSet() {
+    final ProcedureStoreTracker tracker = new ProcedureStoreTracker();
+    final long[] procIds = new long[] { 1, 3, 7, 152, 512, 1024, 1025 };
 
-  @Test
-  // The structure is same as testBitSetNodeSubtract() but the ids are bigger so that internally
-  // there are many BitSetNodes.
-  public void testTrackerSubtract() {
-    // not updated in n2, nothing to subtract
-    ProcedureStoreTracker n1 = buildTracker(new long[]{ 1L, 1000L }, new long[]{ 1L, 1000L },
-        new long[]{ });
-    ProcedureStoreTracker n2 = buildTracker(new long[]{ 1L, 1000L }, new long[]{}, new long[]{});
-    assertFalse(n1.subtract(n2));
-
-    // updated in n2, and not deleted in n1, should subtract.
-    n1 = buildTracker(new long[]{ 1L, 1000L }, new long[]{ 1L, 1000L }, new long[]{});
-    n2 = buildTracker(new long[]{ 1L, 1000L }, new long[]{ 1L, 1000L }, new long[]{});
-    assertTrue(n1.subtract(n2));
-
-    // updated in n2, but also deleted in n1, should not subtract
-    n1 = buildTracker(new long[]{ 1L, 1000L }, new long[]{ 1L, 1000L }, new long[]{ 1L, 1000L });
-    n2 = buildTracker(new long[]{ 1L, 1000L }, new long[]{ 1L }, new long[]{});
-    assertFalse(n1.subtract(n2));
-
-    // updated in n2, but not deleted in n1, should subtract.
-    n1 = buildTracker(new long[]{ 1L, 1000L }, new long[]{ 1L, 1000L }, new long[]{});
-    n2 = buildTracker(new long[]{ 1L, 1000L }, new long[]{ 1L }, new long[]{ 1L, 1000L });
-    assertFalse(n1.subtract(n2));
-
-    n1 = buildTracker(new long[]{ 0L, 100L, 200L, 300L }, new long[]{ 0L, 100L, 200L, 300L },
-        new long[]{ 200L });
-    n2 = buildTracker(new long[]{ 0L, 100L, 200L, 300L }, new long[]{ 0L, 200L, 300L },
-        new long[]{ 0L });
-    assertTrue(n1.subtract(n2));
+    // test single proc
+    for (int i = 0; i < procIds.length; ++i) {
+      tracker.insert(procIds[i]);
+    }
+    assertEquals(false, tracker.isEmpty());
+
+    for (int i = 0; i < procIds.length; ++i) {
+      tracker.setDeletedIfSet(procIds[i] - 1);
+      tracker.setDeletedIfSet(procIds[i]);
+      tracker.setDeletedIfSet(procIds[i] + 1);
+    }
+    assertEquals(true, tracker.isEmpty());
+
+    // test batch
+    tracker.reset();
+    for (int i = 0; i < procIds.length; ++i) {
+      tracker.insert(procIds[i]);
+    }
+    assertEquals(false, tracker.isEmpty());
+
+    tracker.setDeletedIfSet(procIds);
+    assertEquals(true, tracker.isEmpty());
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/319ecd86/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
index 83f481c..f8c3486 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
@@ -31,6 +31,7 @@ import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
@@ -72,6 +73,10 @@ public class TestWALProcedureStore {
   private Path testDir;
   private Path logDir;
 
+  private void setupConfig(final Configuration conf) {
+    conf.setBoolean(WALProcedureStore.EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, true);
+  }
+
   @Before
   public void setUp() throws IOException {
     htu = new HBaseCommonTestingUtility();
@@ -79,6 +84,7 @@ public class TestWALProcedureStore {
     fs = testDir.getFileSystem(htu.getConfiguration());
     assertTrue(testDir.depth() > 1);
 
+    setupConfig(htu.getConfiguration());
     logDir = new Path(testDir, "proc-logs");
     procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir);
     procStore.start(PROCEDURE_STORE_SLOTS);
@@ -101,6 +107,19 @@ public class TestWALProcedureStore {
     for (int i = 0; i < 10; ++i) {
       procStore.periodicRollForTesting();
     }
+    assertEquals(1, procStore.getActiveLogs().size());
+    FileStatus[] status = fs.listStatus(logDir);
+    assertEquals(1, status.length);
+  }
+
+  @Test
+  public void testRestartWithoutData() throws Exception {
+    for (int i = 0; i < 10; ++i) {
+      final LoadCounter loader = new LoadCounter();
+      storeRestart(loader);
+    }
+    LOG.info("ACTIVE WALs " + procStore.getActiveLogs());
+    assertEquals(1, procStore.getActiveLogs().size());
     FileStatus[] status = fs.listStatus(logDir);
     assertEquals(1, status.length);
   }
@@ -126,13 +145,13 @@ public class TestWALProcedureStore {
 
   @Test
   public void testWalCleanerSequentialClean() throws Exception {
-    int NUM = 5;
-    List<Procedure> procs = new ArrayList<>();
+    final Procedure[] procs = new Procedure[5];
     ArrayList<ProcedureWALFile> logs = null;
+
     // Insert procedures and roll wal after every insert.
-    for (int i = 0; i < NUM; i++) {
-      procs.add(new TestSequentialProcedure());
-      procStore.insert(procs.get(i), null);
+    for (int i = 0; i < procs.length; i++) {
+      procs[i] = new TestSequentialProcedure();
+      procStore.insert(procs[i], null);
       procStore.rollWriterForTesting();
       logs = procStore.getActiveLogs();
       assertEquals(logs.size(), i + 2);  // Extra 1 for current ongoing wal.
@@ -140,12 +159,13 @@ public class TestWALProcedureStore {
 
     // Delete procedures in sequential order make sure that only the corresponding wal is deleted
     // from logs list.
-    int[] deleteOrder = new int[]{ 0, 1, 2, 3, 4};
+    final int[] deleteOrder = new int[] { 0, 1, 2, 3, 4 };
     for (int i = 0; i < deleteOrder.length; i++) {
-      procStore.delete(procs.get(deleteOrder[i]).getProcId());
+      procStore.delete(procs[deleteOrder[i]].getProcId());
       procStore.removeInactiveLogsForTesting();
-      assertFalse(procStore.getActiveLogs().contains(logs.get(deleteOrder[i])));
-      assertEquals(procStore.getActiveLogs().size(), NUM - i );
+      assertFalse(logs.get(deleteOrder[i]).toString(),
+        procStore.getActiveLogs().contains(logs.get(deleteOrder[i])));
+      assertEquals(procStore.getActiveLogs().size(), procs.length - i);
     }
   }
 
@@ -154,30 +174,29 @@ public class TestWALProcedureStore {
   // they are in the starting of the list.
   @Test
   public void testWalCleanerNoHoles() throws Exception {
-    int NUM = 5;
-    List<Procedure> procs = new ArrayList<>();
+    final Procedure[] procs = new Procedure[5];
     ArrayList<ProcedureWALFile> logs = null;
     // Insert procedures and roll wal after every insert.
-    for (int i = 0; i < NUM; i++) {
-      procs.add(new TestSequentialProcedure());
-      procStore.insert(procs.get(i), null);
+    for (int i = 0; i < procs.length; i++) {
+      procs[i] = new TestSequentialProcedure();
+      procStore.insert(procs[i], null);
       procStore.rollWriterForTesting();
       logs = procStore.getActiveLogs();
-      assertEquals(logs.size(), i + 2);  // Extra 1 for current ongoing wal.
+      assertEquals(i + 2, logs.size());  // Extra 1 for current ongoing wal.
     }
 
-    for (int i = 1; i < NUM; i++) {
-      procStore.delete(procs.get(i).getProcId());
+    for (int i = 1; i < procs.length; i++) {
+      procStore.delete(procs[i].getProcId());
     }
-    assertEquals(procStore.getActiveLogs().size(), NUM + 1);
-    procStore.delete(procs.get(0).getProcId());
-    assertEquals(procStore.getActiveLogs().size(), 1);
+    assertEquals(procs.length + 1, procStore.getActiveLogs().size());
+    procStore.delete(procs[0].getProcId());
+    assertEquals(1, procStore.getActiveLogs().size());
   }
 
   @Test
   public void testWalCleanerUpdates() throws Exception {
-    TestSequentialProcedure p1 = new TestSequentialProcedure(),
-        p2 = new TestSequentialProcedure();
+    TestSequentialProcedure p1 = new TestSequentialProcedure();
+    TestSequentialProcedure p2 = new TestSequentialProcedure();
     procStore.insert(p1, null);
     procStore.insert(p2, null);
     procStore.rollWriterForTesting();
@@ -192,8 +211,8 @@ public class TestWALProcedureStore {
 
   @Test
   public void testWalCleanerUpdatesDontLeaveHoles() throws Exception {
-    TestSequentialProcedure p1 = new TestSequentialProcedure(),
-        p2 = new TestSequentialProcedure();
+    TestSequentialProcedure p1 = new TestSequentialProcedure();
+    TestSequentialProcedure p2 = new TestSequentialProcedure();
     procStore.insert(p1, null);
     procStore.insert(p2, null);
     procStore.rollWriterForTesting();  // generates first log with p1 + p2
@@ -214,6 +233,36 @@ public class TestWALProcedureStore {
   }
 
   @Test
+  public void testWalCleanerWithEmptyRolls() throws Exception {
+    final Procedure[] procs = new Procedure[3];
+    for (int i = 0; i < procs.length; ++i) {
+      procs[i] = new TestSequentialProcedure();
+      procStore.insert(procs[i], null);
+    }
+    assertEquals(1, procStore.getActiveLogs().size());
+    procStore.rollWriterForTesting();
+    assertEquals(2, procStore.getActiveLogs().size());
+    procStore.rollWriterForTesting();
+    assertEquals(3, procStore.getActiveLogs().size());
+
+    for (int i = 0; i < procs.length; ++i) {
+      procStore.update(procs[i]);
+      procStore.rollWriterForTesting();
+      procStore.rollWriterForTesting();
+      if (i < (procs.length - 1)) {
+        assertEquals(3 + ((i + 1) * 2), procStore.getActiveLogs().size());
+      }
+    }
+    assertEquals(7, procStore.getActiveLogs().size());
+
+    for (int i = 0; i < procs.length; ++i) {
+      procStore.delete(procs[i].getProcId());
+      assertEquals(7 - ((i + 1) * 2), procStore.getActiveLogs().size());
+    }
+    assertEquals(1, procStore.getActiveLogs().size());
+  }
+
+  @Test
   public void testEmptyLogLoad() throws Exception {
     LoadCounter loader = new LoadCounter();
     storeRestart(loader);
@@ -294,6 +343,8 @@ public class TestWALProcedureStore {
     }
 
     // Test Load 1
+    // Restart the store (avoid cleaning up the files, to check the rebuilded trackers)
+    htu.getConfiguration().setBoolean(WALProcedureStore.EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, false);
     LoadCounter loader = new LoadCounter();
     storeRestart(loader);
     assertEquals(1, loader.getLoadedCount());
@@ -360,8 +411,8 @@ public class TestWALProcedureStore {
     assertEquals(0, loader.getCorruptedCount());
   }
 
-  void assertUpdated(final ProcedureStoreTracker tracker, Procedure[] procs,
-      int[] updatedProcs, int[] nonUpdatedProcs) {
+  private static void assertUpdated(final ProcedureStoreTracker tracker,
+      final Procedure[] procs, final int[] updatedProcs, final int[] nonUpdatedProcs) {
     for (int index : updatedProcs) {
       long procId = procs[index].getProcId();
       assertTrue("Procedure id : " + procId, tracker.isUpdated(procId));
@@ -372,8 +423,8 @@ public class TestWALProcedureStore {
     }
   }
 
-  void assertDeleted(final ProcedureStoreTracker tracker, Procedure[] procs,
-      int[] deletedProcs, int[] nonDeletedProcs) {
+  private static void assertDeleted(final ProcedureStoreTracker tracker,
+      final Procedure[] procs, final int[] deletedProcs, final int[] nonDeletedProcs) {
     for (int index : deletedProcs) {
       long procId = procs[index].getProcId();
       assertEquals("Procedure id : " + procId,
@@ -423,7 +474,8 @@ public class TestWALProcedureStore {
       corruptLog(logs[i], 4);
     }
 
-    // Restart the store
+    // Restart the store (avoid cleaning up the files, to check the rebuilded trackers)
+    htu.getConfiguration().setBoolean(WALProcedureStore.EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, false);
     final LoadCounter loader = new LoadCounter();
     storeRestart(loader);
     assertEquals(3, loader.getLoadedCount());  // procs 1, 3 and 5
@@ -431,6 +483,7 @@ public class TestWALProcedureStore {
 
     // Check the Trackers
     final ArrayList<ProcedureWALFile> walFiles = procStore.getActiveLogs();
+    LOG.info("WALs " + walFiles);
     assertEquals(4, walFiles.size());
     LOG.info("Checking wal " + walFiles.get(0));
     assertUpdated(walFiles.get(0).getTracker(), procs, new int[]{0, 1, 2, 3}, new int[] {4, 5});
@@ -660,7 +713,7 @@ public class TestWALProcedureStore {
 
   @Test
   public void testFileNotFoundDuringLeaseRecovery() throws IOException {
-    TestProcedure[] procs = new TestProcedure[3];
+    final TestProcedure[] procs = new TestProcedure[3];
     for (int i = 0; i < procs.length; ++i) {
       procs[i] = new TestProcedure(i + 1, 0);
       procStore.insert(procs[i], null);
@@ -673,7 +726,7 @@ public class TestWALProcedureStore {
     procStore.stop(false);
 
     FileStatus[] status = fs.listStatus(logDir);
-    assertEquals(procs.length + 2, status.length);
+    assertEquals(procs.length + 1, status.length);
 
     // simulate another active master removing the wals
     procStore = new WALProcedureStore(htu.getConfiguration(), fs, logDir,
@@ -696,7 +749,7 @@ public class TestWALProcedureStore {
     procStore.recoverLease();
     procStore.load(loader);
     assertEquals(procs.length, loader.getMaxProcId());
-    assertEquals(procs.length - 1, loader.getRunnableCount());
+    assertEquals(1, loader.getRunnableCount());
     assertEquals(0, loader.getCompletedCount());
     assertEquals(0, loader.getCorruptedCount());
   }