You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/11/03 00:37:13 UTC

hbase git commit: HBASE-21351 The force update thread may have race with PE worker when the procedure is rolling back

Repository: hbase
Updated Branches:
  refs/heads/master 25c964e9a -> 62fe36593


HBASE-21351 The force update thread may have race with PE worker when the procedure is rolling back


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/62fe3659
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/62fe3659
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/62fe3659

Branch: refs/heads/master
Commit: 62fe3659349800b0fcbfcafd140ae6a3f57c5804
Parents: 25c964e
Author: zhangduo <zh...@apache.org>
Authored: Fri Nov 2 19:56:16 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Sat Nov 3 08:24:11 2018 +0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/util/IdLock.java    |  45 ++-
 .../procedure2/CompletedProcedureCleaner.java   | 138 ++++++++
 .../procedure2/CompletedProcedureRetainer.java  |  55 ++++
 .../hbase/procedure2/FailedProcedure.java       |  78 +++++
 .../hbase/procedure2/ProcedureExecutor.java     | 323 ++++++-------------
 .../procedure2/TestForceUpdateProcedure.java    | 204 ++++++++++++
 .../store/wal/TestForceUpdateProcedure.java     | 246 --------------
 .../hadoop/hbase/HBaseTestingUtility.java       |   1 +
 8 files changed, 604 insertions(+), 486 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/62fe3659/hbase-common/src/main/java/org/apache/hadoop/hbase/util/IdLock.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/IdLock.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/IdLock.java
index c4adfbf..9e5692f 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/IdLock.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/IdLock.java
@@ -23,6 +23,8 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
@@ -42,20 +44,24 @@ import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
 @InterfaceAudience.Private
 public class IdLock {
 
+  private static final Logger LOG = LoggerFactory.getLogger(IdLock.class);
+
   /** An entry returned to the client as a lock object */
   public static final class Entry {
     private final long id;
     private int numWaiters;
     private boolean locked = true;
+    private Thread holder;
 
-    private Entry(long id) {
+    private Entry(long id, Thread holder) {
       this.id = id;
+      this.holder = holder;
     }
 
     @Override
     public String toString() {
       return "id=" + id + ", numWaiter=" + numWaiters + ", isLocked="
-          + locked;
+          + locked + ", holder=" + holder;
     }
   }
 
@@ -70,7 +76,8 @@ public class IdLock {
    * @throws IOException if interrupted
    */
   public Entry getLockEntry(long id) throws IOException {
-    Entry entry = new Entry(id);
+    Thread currentThread = Thread.currentThread();
+    Entry entry = new Entry(id, currentThread);
     Entry existing;
     while ((existing = map.putIfAbsent(entry.id, entry)) != null) {
       synchronized (existing) {
@@ -99,6 +106,7 @@ public class IdLock {
 
           --existing.numWaiters;  // Remove ourselves from waiters.
           existing.locked = true;
+          existing.holder = currentThread;
           return existing;
         }
         // If the entry is not locked, it might already be deleted from the
@@ -120,7 +128,8 @@ public class IdLock {
    */
   public Entry tryLockEntry(long id, long time) throws IOException {
     Preconditions.checkArgument(time >= 0);
-    Entry entry = new Entry(id);
+    Thread currentThread = Thread.currentThread();
+    Entry entry = new Entry(id, currentThread);
     Entry existing;
     long waitUtilTS = System.currentTimeMillis() + time;
     long remaining = time;
@@ -158,6 +167,7 @@ public class IdLock {
             --existing.numWaiters;  // Remove ourselves from waiters.
           }
           existing.locked = true;
+          existing.holder = currentThread;
           return existing;
         }
         // If the entry is not locked, it might already be deleted from the
@@ -169,14 +179,17 @@ public class IdLock {
   }
 
   /**
-   * Must be called in a finally block to decrease the internal counter and
-   * remove the monitor object for the given id if the caller is the last
-   * client.
-   *
+   * Must be called in a finally block to decrease the internal counter and remove the monitor
+   * object for the given id if the caller is the last client.
    * @param entry the return value of {@link #getLockEntry(long)}
    */
   public void releaseLockEntry(Entry entry) {
+    Thread currentThread = Thread.currentThread();
     synchronized (entry) {
+      if (entry.holder != currentThread) {
+        LOG.warn("{} is trying to release lock entry {}, but it is not the holder.", currentThread,
+          entry);
+      }
       entry.locked = false;
       if (entry.numWaiters > 0) {
         entry.notify();
@@ -186,7 +199,21 @@ public class IdLock {
     }
   }
 
-  /** For testing */
+  /**
+   * Test whether the given id is already locked by the current thread.
+   */
+  public boolean isHeldByCurrentThread(long id) {
+    Thread currentThread = Thread.currentThread();
+    Entry entry = map.get(id);
+    if (entry == null) {
+      return false;
+    }
+    synchronized (entry) {
+      return currentThread.equals(entry.holder);
+    }
+  }
+
+  @VisibleForTesting
   void assertMapEmpty() {
     assert map.isEmpty();
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/62fe3659/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/CompletedProcedureCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/CompletedProcedureCleaner.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/CompletedProcedureCleaner.java
new file mode 100644
index 0000000..e51b77b
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/CompletedProcedureCleaner.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure2;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Iterator;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.IdLock;
+import org.apache.hadoop.hbase.util.NonceKey;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Internal cleaner that removes the completed procedure results after a TTL.
+ * <p/>
+ * NOTE: This is a special case handled in timeoutLoop().
+ * <p/>
+ * Since the client code looks more or less like:
+ *
+ * <pre>
+ *   procId = master.doOperation()
+ *   while (master.getProcResult(procId) == ProcInProgress);
+ * </pre>
+ *
+ * The master should not throw away the proc result as soon as the procedure is done but should wait
+ * a result request from the client (see executor.removeResult(procId)) The client will call
+ * something like master.isProcDone() or master.getProcResult() which will return the result/state
+ * to the client, and it will mark the completed proc as ready to delete. note that the client may
+ * not receive the response from the master (e.g. master failover) so, if we delay a bit the real
+ * deletion of the proc result the client will be able to get the result the next try.
+ */
+@InterfaceAudience.Private
+class CompletedProcedureCleaner<TEnvironment> extends ProcedureInMemoryChore<TEnvironment> {
+  private static final Logger LOG = LoggerFactory.getLogger(CompletedProcedureCleaner.class);
+
+  static final String CLEANER_INTERVAL_CONF_KEY = "hbase.procedure.cleaner.interval";
+  private static final int DEFAULT_CLEANER_INTERVAL = 30 * 1000; // 30sec
+
+  private static final String BATCH_SIZE_CONF_KEY = "hbase.procedure.cleaner.evict.batch.size";
+  private static final int DEFAULT_BATCH_SIZE = 32;
+
+  private final Map<Long, CompletedProcedureRetainer<TEnvironment>> completed;
+  private final Map<NonceKey, Long> nonceKeysToProcIdsMap;
+  private final ProcedureStore store;
+  private final IdLock procExecutionLock;
+  private Configuration conf;
+
+  public CompletedProcedureCleaner(Configuration conf, ProcedureStore store,
+      IdLock procExecutionLock, Map<Long, CompletedProcedureRetainer<TEnvironment>> completedMap,
+      Map<NonceKey, Long> nonceKeysToProcIdsMap) {
+    // set the timeout interval that triggers the periodic-procedure
+    super(conf.getInt(CLEANER_INTERVAL_CONF_KEY, DEFAULT_CLEANER_INTERVAL));
+    this.completed = completedMap;
+    this.nonceKeysToProcIdsMap = nonceKeysToProcIdsMap;
+    this.store = store;
+    this.procExecutionLock = procExecutionLock;
+    this.conf = conf;
+  }
+
+  @Override
+  protected void periodicExecute(final TEnvironment env) {
+    if (completed.isEmpty()) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("No completed procedures to cleanup.");
+      }
+      return;
+    }
+
+    final long evictTtl =
+      conf.getInt(ProcedureExecutor.EVICT_TTL_CONF_KEY, ProcedureExecutor.DEFAULT_EVICT_TTL);
+    final long evictAckTtl = conf.getInt(ProcedureExecutor.EVICT_ACKED_TTL_CONF_KEY,
+      ProcedureExecutor.DEFAULT_ACKED_EVICT_TTL);
+    final int batchSize = conf.getInt(BATCH_SIZE_CONF_KEY, DEFAULT_BATCH_SIZE);
+
+    final long[] batchIds = new long[batchSize];
+    int batchCount = 0;
+
+    final long now = EnvironmentEdgeManager.currentTime();
+    final Iterator<Map.Entry<Long, CompletedProcedureRetainer<TEnvironment>>> it =
+      completed.entrySet().iterator();
+    while (it.hasNext() && store.isRunning()) {
+      final Map.Entry<Long, CompletedProcedureRetainer<TEnvironment>> entry = it.next();
+      final CompletedProcedureRetainer<TEnvironment> retainer = entry.getValue();
+      final Procedure<?> proc = retainer.getProcedure();
+      IdLock.Entry lockEntry;
+      try {
+        lockEntry = procExecutionLock.getLockEntry(proc.getProcId());
+      } catch (IOException e) {
+        // can only happen if interrupted, so not a big deal to propagate it
+        throw new UncheckedIOException(e);
+      }
+      try {
+        // TODO: Select TTL based on Procedure type
+        if (retainer.isExpired(now, evictTtl, evictAckTtl)) {
+          // Failed procedures aren't persisted in WAL.
+          if (!(proc instanceof FailedProcedure)) {
+            batchIds[batchCount++] = entry.getKey();
+            if (batchCount == batchIds.length) {
+              store.delete(batchIds, 0, batchCount);
+              batchCount = 0;
+            }
+          }
+          final NonceKey nonceKey = proc.getNonceKey();
+          if (nonceKey != null) {
+            nonceKeysToProcIdsMap.remove(nonceKey);
+          }
+          it.remove();
+          LOG.trace("Evict completed {}", proc);
+        }
+      } finally {
+        procExecutionLock.releaseLockEntry(lockEntry);
+      }
+    }
+    if (batchCount > 0) {
+      store.delete(batchIds, 0, batchCount);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/62fe3659/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/CompletedProcedureRetainer.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/CompletedProcedureRetainer.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/CompletedProcedureRetainer.java
new file mode 100644
index 0000000..d5f1ee7
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/CompletedProcedureRetainer.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure2;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Hold the reference to a completed root procedure. Will be cleaned up after expired.
+ */
+@InterfaceAudience.Private
+class CompletedProcedureRetainer<TEnvironment> {
+  private final Procedure<TEnvironment> procedure;
+  private long clientAckTime;
+
+  public CompletedProcedureRetainer(Procedure<TEnvironment> procedure) {
+    this.procedure = procedure;
+    clientAckTime = -1;
+  }
+
+  public Procedure<TEnvironment> getProcedure() {
+    return procedure;
+  }
+
+  public boolean hasClientAckTime() {
+    return clientAckTime != -1;
+  }
+
+  public long getClientAckTime() {
+    return clientAckTime;
+  }
+
+  public void setClientAckTime(long clientAckTime) {
+    this.clientAckTime = clientAckTime;
+  }
+
+  public boolean isExpired(long now, long evictTtl, long evictAckTtl) {
+    return (hasClientAckTime() && (now - getClientAckTime()) >= evictAckTtl) ||
+      (now - procedure.getLastUpdate()) >= evictTtl;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/62fe3659/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/FailedProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/FailedProcedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/FailedProcedure.java
new file mode 100644
index 0000000..8e7db44
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/FailedProcedure.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure2;
+
+import java.io.IOException;
+import java.util.Objects;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.NonceKey;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
+
+@InterfaceAudience.Private
+class FailedProcedure<TEnvironment> extends Procedure<TEnvironment> {
+
+  private String procName;
+
+  public FailedProcedure() {
+  }
+
+  public FailedProcedure(long procId, String procName, User owner, NonceKey nonceKey,
+      IOException exception) {
+    this.procName = procName;
+    setProcId(procId);
+    setState(ProcedureState.ROLLEDBACK);
+    setOwner(owner);
+    setNonceKey(nonceKey);
+    long currentTime = EnvironmentEdgeManager.currentTime();
+    setSubmittedTime(currentTime);
+    setLastUpdate(currentTime);
+    setFailure(Objects.toString(exception.getMessage(), ""), exception);
+  }
+
+  @Override
+  public String getProcName() {
+    return procName;
+  }
+
+  @Override
+  protected Procedure<TEnvironment>[] execute(TEnvironment env)
+      throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected void rollback(TEnvironment env) throws IOException, InterruptedException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected boolean abort(TEnvironment env) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
+  }
+
+  @Override
+  protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/62fe3659/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
index f0affd2..d02ca6e 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
@@ -18,16 +18,14 @@
 package org.apache.hadoop.hbase.procedure2;
 
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Deque;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
-import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -87,6 +85,12 @@ public class ProcedureExecutor<TEnvironment> {
       "hbase.procedure.worker.keep.alive.time.msec";
   private static final long DEFAULT_WORKER_KEEP_ALIVE_TIME = TimeUnit.MINUTES.toMillis(1);
 
+  public static final String EVICT_TTL_CONF_KEY = "hbase.procedure.cleaner.evict.ttl";
+  static final int DEFAULT_EVICT_TTL = 15 * 60000; // 15min
+
+  public static final String EVICT_ACKED_TTL_CONF_KEY ="hbase.procedure.cleaner.acked.evict.ttl";
+  static final int DEFAULT_ACKED_EVICT_TTL = 5 * 60000; // 5min
+
   /**
    * {@link #testing} is non-null when ProcedureExecutor is being tested. Tests will try to
    * break PE having it fail at various junctures. When non-null, testing is set to an instance of
@@ -155,134 +159,6 @@ public class ProcedureExecutor<TEnvironment> {
     void procedureFinished(long procId);
   }
 
-  private static final class CompletedProcedureRetainer<TEnvironment> {
-    private final Procedure<TEnvironment> procedure;
-    private long clientAckTime;
-
-    public CompletedProcedureRetainer(Procedure<TEnvironment> procedure) {
-      this.procedure = procedure;
-      clientAckTime = -1;
-    }
-
-    public Procedure<TEnvironment> getProcedure() {
-      return procedure;
-    }
-
-    public boolean hasClientAckTime() {
-      return clientAckTime != -1;
-    }
-
-    public long getClientAckTime() {
-      return clientAckTime;
-    }
-
-    public void setClientAckTime(long clientAckTime) {
-      this.clientAckTime = clientAckTime;
-    }
-
-    public boolean isExpired(long now, long evictTtl, long evictAckTtl) {
-      return (hasClientAckTime() && (now - getClientAckTime()) >= evictAckTtl) ||
-        (now - procedure.getLastUpdate()) >= evictTtl;
-    }
-  }
-
-  /**
-   * Internal cleaner that removes the completed procedure results after a TTL.
-   * NOTE: This is a special case handled in timeoutLoop().
-   *
-   * <p>Since the client code looks more or less like:
-   * <pre>
-   *   procId = master.doOperation()
-   *   while (master.getProcResult(procId) == ProcInProgress);
-   * </pre>
-   * The master should not throw away the proc result as soon as the procedure is done
-   * but should wait a result request from the client (see executor.removeResult(procId))
-   * The client will call something like master.isProcDone() or master.getProcResult()
-   * which will return the result/state to the client, and it will mark the completed
-   * proc as ready to delete. note that the client may not receive the response from
-   * the master (e.g. master failover) so, if we delay a bit the real deletion of
-   * the proc result the client will be able to get the result the next try.
-   */
-  private static class CompletedProcedureCleaner<TEnvironment>
-      extends ProcedureInMemoryChore<TEnvironment> {
-    private static final Logger LOG = LoggerFactory.getLogger(CompletedProcedureCleaner.class);
-
-    private static final String CLEANER_INTERVAL_CONF_KEY = "hbase.procedure.cleaner.interval";
-    private static final int DEFAULT_CLEANER_INTERVAL = 30 * 1000; // 30sec
-
-    private static final String EVICT_TTL_CONF_KEY = "hbase.procedure.cleaner.evict.ttl";
-    private static final int DEFAULT_EVICT_TTL = 15 * 60000; // 15min
-
-    private static final String EVICT_ACKED_TTL_CONF_KEY ="hbase.procedure.cleaner.acked.evict.ttl";
-    private static final int DEFAULT_ACKED_EVICT_TTL = 5 * 60000; // 5min
-
-    private static final String BATCH_SIZE_CONF_KEY = "hbase.procedure.cleaner.evict.batch.size";
-    private static final int DEFAULT_BATCH_SIZE = 32;
-
-    private final Map<Long, CompletedProcedureRetainer<TEnvironment>> completed;
-    private final Map<NonceKey, Long> nonceKeysToProcIdsMap;
-    private final ProcedureStore store;
-    private Configuration conf;
-
-    public CompletedProcedureCleaner(Configuration conf, final ProcedureStore store,
-        final Map<Long, CompletedProcedureRetainer<TEnvironment>> completedMap,
-        final Map<NonceKey, Long> nonceKeysToProcIdsMap) {
-      // set the timeout interval that triggers the periodic-procedure
-      super(conf.getInt(CLEANER_INTERVAL_CONF_KEY, DEFAULT_CLEANER_INTERVAL));
-      this.completed = completedMap;
-      this.nonceKeysToProcIdsMap = nonceKeysToProcIdsMap;
-      this.store = store;
-      this.conf = conf;
-    }
-
-    @Override
-    protected void periodicExecute(final TEnvironment env) {
-      if (completed.isEmpty()) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("No completed procedures to cleanup.");
-        }
-        return;
-      }
-
-      final long evictTtl = conf.getInt(EVICT_TTL_CONF_KEY, DEFAULT_EVICT_TTL);
-      final long evictAckTtl = conf.getInt(EVICT_ACKED_TTL_CONF_KEY, DEFAULT_ACKED_EVICT_TTL);
-      final int batchSize = conf.getInt(BATCH_SIZE_CONF_KEY, DEFAULT_BATCH_SIZE);
-
-      final long[] batchIds = new long[batchSize];
-      int batchCount = 0;
-
-      final long now = EnvironmentEdgeManager.currentTime();
-      final Iterator<Map.Entry<Long, CompletedProcedureRetainer<TEnvironment>>> it =
-        completed.entrySet().iterator();
-      while (it.hasNext() && store.isRunning()) {
-        final Map.Entry<Long, CompletedProcedureRetainer<TEnvironment>> entry = it.next();
-        final CompletedProcedureRetainer<TEnvironment> retainer = entry.getValue();
-        final Procedure<?> proc = retainer.getProcedure();
-
-        // TODO: Select TTL based on Procedure type
-        if (retainer.isExpired(now, evictTtl, evictAckTtl)) {
-          // Failed procedures aren't persisted in WAL.
-          if (!(proc instanceof FailedProcedure)) {
-            batchIds[batchCount++] = entry.getKey();
-            if (batchCount == batchIds.length) {
-              store.delete(batchIds, 0, batchCount);
-              batchCount = 0;
-            }
-          }
-          final NonceKey nonceKey = proc.getNonceKey();
-          if (nonceKey != null) {
-            nonceKeysToProcIdsMap.remove(nonceKey);
-          }
-          it.remove();
-          LOG.trace("Evict completed {}", proc);
-        }
-      }
-      if (batchCount > 0) {
-        store.delete(batchIds, 0, batchCount);
-      }
-    }
-  }
-
   /**
    * Map the the procId returned by submitProcedure(), the Root-ProcID, to the Procedure.
    * Once a Root-Procedure completes (success or failure), the result will be added to this map.
@@ -385,15 +261,26 @@ public class ProcedureExecutor<TEnvironment> {
     IdLock.Entry lockEntry = procExecutionLock.getLockEntry(procId);
     try {
       Procedure<TEnvironment> proc = procedures.get(procId);
-      if (proc == null) {
-        LOG.debug("No pending procedure with id = {}, skip force updating.", procId);
-        return;
-      }
-      // For a sub procedure which root parent has not been finished, we still need to retain the
-      // wal even if the procedure itself is finished.
-      if (proc.isFinished() && (!proc.hasParent() || isRootFinished(proc))) {
-        LOG.debug("Procedure {} has already been finished, skip force updating.", proc);
-        return;
+      if (proc != null) {
+        if (proc.isFinished() && proc.hasParent() && isRootFinished(proc)) {
+          LOG.debug("Procedure {} has already been finished and parent is succeeded," +
+            " skip force updating", proc);
+          return;
+        }
+      } else {
+        CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
+        if (retainer == null || retainer.getProcedure() instanceof FailedProcedure) {
+          LOG.debug("No pending procedure with id = {}, skip force updating.", procId);
+          return;
+        }
+        long evictTtl = conf.getInt(EVICT_TTL_CONF_KEY, DEFAULT_EVICT_TTL);
+        long evictAckTtl = conf.getInt(EVICT_ACKED_TTL_CONF_KEY, DEFAULT_ACKED_EVICT_TTL);
+        if (retainer.isExpired(System.currentTimeMillis(), evictTtl, evictAckTtl)) {
+          LOG.debug("Procedure {} has already been finished and expired, skip force updating",
+            procId);
+          return;
+        }
+        proc = retainer.getProcedure();
       }
       LOG.debug("Force update procedure {}", proc);
       store.update(proc);
@@ -731,7 +618,8 @@ public class ProcedureExecutor<TEnvironment> {
     timeoutExecutor.add(new WorkerMonitor());
 
     // Add completed cleaner chore
-    addChore(new CompletedProcedureCleaner<>(conf, store, completed, nonceKeysToProcIdsMap));
+    addChore(new CompletedProcedureCleaner<>(conf, store, procExecutionLock, completed,
+      nonceKeysToProcIdsMap));
   }
 
   public void stop() {
@@ -918,59 +806,6 @@ public class ProcedureExecutor<TEnvironment> {
     }
   }
 
-  public static class FailedProcedure<TEnvironment> extends Procedure<TEnvironment> {
-    private String procName;
-
-    public FailedProcedure() {
-    }
-
-    public FailedProcedure(long procId, String procName, User owner,
-        NonceKey nonceKey, IOException exception) {
-      this.procName = procName;
-      setProcId(procId);
-      setState(ProcedureState.ROLLEDBACK);
-      setOwner(owner);
-      setNonceKey(nonceKey);
-      long currentTime = EnvironmentEdgeManager.currentTime();
-      setSubmittedTime(currentTime);
-      setLastUpdate(currentTime);
-      setFailure(Objects.toString(exception.getMessage(), ""), exception);
-    }
-
-    @Override
-    public String getProcName() {
-      return procName;
-    }
-
-    @Override
-    protected Procedure<TEnvironment>[] execute(TEnvironment env)
-        throws ProcedureYieldException, ProcedureSuspendedException,
-        InterruptedException {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    protected void rollback(TEnvironment env)
-        throws IOException, InterruptedException {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    protected boolean abort(TEnvironment env) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    protected void serializeStateData(ProcedureStateSerializer serializer)
-        throws IOException {
-    }
-
-    @Override
-    protected void deserializeStateData(ProcedureStateSerializer serializer)
-        throws IOException {
-    }
-  }
-
   /**
    * If the failure failed before submitting it, we may want to give back the
    * same error to the requests with the same nonceKey.
@@ -1616,53 +1451,74 @@ public class ProcedureExecutor<TEnvironment> {
     int stackTail = subprocStack.size();
     while (stackTail-- > 0) {
       Procedure<TEnvironment> proc = subprocStack.get(stackTail);
-      // For the sub procedures which are successfully finished, we do not rollback them.
-      // Typically, if we want to rollback a procedure, we first need to rollback it, and then
-      // recursively rollback its ancestors. The state changes which are done by sub procedures
-      // should be handled by parent procedures when rolling back. For example, when rolling back a
-      // MergeTableProcedure, we will schedule new procedures to bring the offline regions online,
-      // instead of rolling back the original procedures which offlined the regions(in fact these
-      // procedures can not be rolled back...).
-      if (proc.isSuccess()) {
-        // Just do the cleanup work, without actually executing the rollback
-        subprocStack.remove(stackTail);
-        cleanupAfterRollbackOneStep(proc);
-        continue;
-      }
-      LockState lockState = acquireLock(proc);
-      if (lockState != LockState.LOCK_ACQUIRED) {
-        // can't take a lock on the procedure, add the root-proc back on the
-        // queue waiting for the lock availability
-        return lockState;
+      IdLock.Entry lockEntry = null;
+      // Hold the execution lock if it is not held by us. The IdLock is not reentrant so we need
+      // this check, as the worker will hold the lock before executing a procedure. This is the only
+      // place where we may hold two procedure execution locks, and there is a fence in the
+      // RootProcedureState where we can make sure that only one worker can execute the rollback of
+      // a RootProcedureState, so there is no dead lock problem. And the lock here is necessary to
+      // prevent race between us and the force update thread.
+      if (!procExecutionLock.isHeldByCurrentThread(proc.getProcId())) {
+        try {
+          lockEntry = procExecutionLock.getLockEntry(proc.getProcId());
+        } catch (IOException e) {
+          // can only happen if interrupted, so not a big deal to propagate it
+          throw new UncheckedIOException(e);
+        }
       }
+      try {
+        // For the sub procedures which are successfully finished, we do not rollback them.
+        // Typically, if we want to rollback a procedure, we first need to rollback it, and then
+        // recursively rollback its ancestors. The state changes which are done by sub procedures
+        // should be handled by parent procedures when rolling back. For example, when rolling back
+        // a MergeTableProcedure, we will schedule new procedures to bring the offline regions
+        // online, instead of rolling back the original procedures which offlined the regions(in
+        // fact these procedures can not be rolled back...).
+        if (proc.isSuccess()) {
+          // Just do the cleanup work, without actually executing the rollback
+          subprocStack.remove(stackTail);
+          cleanupAfterRollbackOneStep(proc);
+          continue;
+        }
+        LockState lockState = acquireLock(proc);
+        if (lockState != LockState.LOCK_ACQUIRED) {
+          // can't take a lock on the procedure, add the root-proc back on the
+          // queue waiting for the lock availability
+          return lockState;
+        }
 
-      lockState = executeRollback(proc);
-      releaseLock(proc, false);
-      boolean abortRollback = lockState != LockState.LOCK_ACQUIRED;
-      abortRollback |= !isRunning() || !store.isRunning();
+        lockState = executeRollback(proc);
+        releaseLock(proc, false);
+        boolean abortRollback = lockState != LockState.LOCK_ACQUIRED;
+        abortRollback |= !isRunning() || !store.isRunning();
 
-      // allows to kill the executor before something is stored to the wal.
-      // useful to test the procedure recovery.
-      if (abortRollback) {
-        return lockState;
-      }
+        // allows to kill the executor before something is stored to the wal.
+        // useful to test the procedure recovery.
+        if (abortRollback) {
+          return lockState;
+        }
 
-      subprocStack.remove(stackTail);
+        subprocStack.remove(stackTail);
 
-      // if the procedure is kind enough to pass the slot to someone else, yield
-      // if the proc is already finished, do not yield
-      if (!proc.isFinished() && proc.isYieldAfterExecutionStep(getEnvironment())) {
-        return LockState.LOCK_YIELD_WAIT;
-      }
+        // if the procedure is kind enough to pass the slot to someone else, yield
+        // if the proc is already finished, do not yield
+        if (!proc.isFinished() && proc.isYieldAfterExecutionStep(getEnvironment())) {
+          return LockState.LOCK_YIELD_WAIT;
+        }
 
-      if (proc != rootProc) {
-        execCompletionCleanup(proc);
+        if (proc != rootProc) {
+          execCompletionCleanup(proc);
+        }
+      } finally {
+        if (lockEntry != null) {
+          procExecutionLock.releaseLockEntry(lockEntry);
+        }
       }
     }
 
     // Finalize the procedure state
-    LOG.info("Rolled back " + rootProc +
-             " exec-time=" + StringUtils.humanTimeDiff(rootProc.elapsedTime()));
+    LOG.info("Rolled back {} exec-time={}", rootProc,
+      StringUtils.humanTimeDiff(rootProc.elapsedTime()));
     procedureFinished(rootProc);
     return LockState.LOCK_ACQUIRED;
   }
@@ -2046,6 +1902,11 @@ public class ProcedureExecutor<TEnvironment> {
     return scheduler;
   }
 
+  @VisibleForTesting
+  int getCompletedSize() {
+    return completed.size();
+  }
+
   // ==========================================================================
   //  Worker Thread
   // ==========================================================================

http://git-wip-us.apache.org/repos/asf/hbase/blob/62fe3659/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestForceUpdateProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestForceUpdateProcedure.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestForceUpdateProcedure.java
new file mode 100644
index 0000000..178b0cb
--- /dev/null
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestForceUpdateProcedure.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure2;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Exchanger;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
+import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure;
+import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
+
+@Category({ MasterTests.class, SmallTests.class })
+public class TestForceUpdateProcedure {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestForceUpdateProcedure.class);
+
+  private static HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility();
+
+  private static WALProcedureStore STORE;
+
+  private static ProcedureExecutor<Void> EXEC;
+
+  private static Exchanger<Boolean> EXCHANGER = new Exchanger<>();
+
+  private static int WAL_COUNT = 5;
+
+  @Rule
+  public final TestName name = new TestName();
+
+  private void createStoreAndExecutor() throws IOException {
+    UTIL.getConfiguration().setInt(CompletedProcedureCleaner.CLEANER_INTERVAL_CONF_KEY, 1000);
+    Path logDir = UTIL.getDataTestDir(name.getMethodName());
+    STORE = ProcedureTestingUtility.createWalStore(UTIL.getConfiguration(), logDir);
+    STORE.start(1);
+    EXEC = new ProcedureExecutor<Void>(UTIL.getConfiguration(), null, STORE);
+    ProcedureTestingUtility.initAndStartWorkers(EXEC, 1, true);
+  }
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws IOException {
+    UTIL.getConfiguration().setInt(WALProcedureStore.WAL_COUNT_WARN_THRESHOLD_CONF_KEY, WAL_COUNT);
+  }
+
+  private void stopStoreAndExecutor() {
+    EXEC.stop();
+    STORE.stop(false);
+    EXEC = null;
+    STORE = null;
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws IOException {
+    UTIL.cleanupTestDir();
+  }
+
+  @Before
+  public void setUp() throws IOException {
+    createStoreAndExecutor();
+  }
+
+  @After
+  public void tearDown() {
+    stopStoreAndExecutor();
+  }
+
+  public static final class WaitingProcedure extends NoopProcedure<Void> {
+
+    @Override
+    protected Procedure<Void>[] execute(Void env)
+        throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
+      EXCHANGER.exchange(Boolean.TRUE);
+      setState(ProcedureState.WAITING_TIMEOUT);
+      setTimeout(Integer.MAX_VALUE);
+      throw new ProcedureSuspendedException();
+    }
+  }
+
+  public static final class ParentProcedure extends NoopProcedure<Void> {
+
+    @SuppressWarnings("unchecked")
+    @Override
+    protected Procedure<Void>[] execute(Void env)
+        throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
+      return new Procedure[] { new NoopProcedure<>(), new WaitingProcedure() };
+    }
+  }
+
+  public static final class ExchangeProcedure extends NoopProcedure<Void> {
+
+    @SuppressWarnings("unchecked")
+    @Override
+    protected Procedure<Void>[] execute(Void env)
+        throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
+      if (EXCHANGER.exchange(Boolean.TRUE)) {
+        return new Procedure[] { this };
+      } else {
+        return null;
+      }
+    }
+  }
+
+  public static final class NoopNoAckProcedure extends NoopProcedure<Void> {
+
+    @Override
+    protected boolean shouldWaitClientAck(Void env) {
+      return false;
+    }
+  }
+
+  @Test
+  public void testProcedureStuck() throws IOException, InterruptedException {
+    EXEC.submitProcedure(new ParentProcedure());
+    EXCHANGER.exchange(Boolean.TRUE);
+    UTIL.waitFor(10000, () -> EXEC.getActiveExecutorCount() == 0);
+    // The above operations are used to make sure that we have persist the states of the two
+    // procedures.
+    long procId = EXEC.submitProcedure(new ExchangeProcedure());
+    assertEquals(1, STORE.getActiveLogs().size());
+    for (int i = 0; i < WAL_COUNT - 1; i++) {
+      assertTrue(STORE.rollWriterForTesting());
+      // The WaitinProcedure never gets updated so we can not delete the oldest wal file, so the
+      // number of wal files will increase
+      assertEquals(2 + i, STORE.getActiveLogs().size());
+      EXCHANGER.exchange(Boolean.TRUE);
+      Thread.sleep(1000);
+    }
+    STORE.rollWriterForTesting();
+    // Finish the ExchangeProcedure
+    EXCHANGER.exchange(Boolean.FALSE);
+    // Make sure that we can delete several wal files because we force update the state of
+    // WaitingProcedure. Notice that the last closed wal files can not be deleted, as when rolling
+    // the newest wal file does not have anything in it, and in the closed file we still have the
+    // state for the ExchangeProcedure so it can not be deleted
+    UTIL.waitFor(10000, () -> STORE.getActiveLogs().size() <= 2);
+    UTIL.waitFor(10000, () -> EXEC.isFinished(procId));
+    // Make sure that after the force update we could still load the procedures
+    stopStoreAndExecutor();
+    createStoreAndExecutor();
+    Map<Class<?>, Procedure<Void>> procMap = new HashMap<>();
+    EXEC.getActiveProceduresNoCopy().forEach(p -> procMap.put(p.getClass(), p));
+    assertEquals(3, procMap.size());
+    ParentProcedure parentProc = (ParentProcedure) procMap.get(ParentProcedure.class);
+    assertEquals(ProcedureState.WAITING, parentProc.getState());
+    WaitingProcedure waitingProc = (WaitingProcedure) procMap.get(WaitingProcedure.class);
+    assertEquals(ProcedureState.WAITING_TIMEOUT, waitingProc.getState());
+    NoopProcedure<Void> noopProc = (NoopProcedure<Void>) procMap.get(NoopProcedure.class);
+    assertEquals(ProcedureState.SUCCESS, noopProc.getState());
+  }
+
+  @Test
+  public void testCompletedProcedure() throws InterruptedException, IOException {
+    long procId = EXEC.submitProcedure(new ExchangeProcedure());
+    EXCHANGER.exchange(Boolean.FALSE);
+    UTIL.waitFor(10000, () -> EXEC.isFinished(procId));
+    for (int i = 0; i < WAL_COUNT - 1; i++) {
+      assertTrue(STORE.rollWriterForTesting());
+      // The exchange procedure is completed but still not deleted yet so we can not delete the
+      // oldest wal file
+      long pid = EXEC.submitProcedure(new NoopNoAckProcedure());
+      assertEquals(2 + i, STORE.getActiveLogs().size());
+      UTIL.waitFor(10000, () -> EXEC.isFinished(pid));
+    }
+    // Only the exchange procedure can not be deleted
+    UTIL.waitFor(10000, () -> EXEC.getCompletedSize() == 1);
+    STORE.rollWriterForTesting();
+    UTIL.waitFor(10000, () -> STORE.getActiveLogs().size() <= 1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/62fe3659/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestForceUpdateProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestForceUpdateProcedure.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestForceUpdateProcedure.java
deleted file mode 100644
index df6ee51..0000000
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestForceUpdateProcedure.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.procedure2.store.wal;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.Exchanger;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
-import org.apache.hadoop.hbase.procedure2.Procedure;
-import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
-import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
-import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
-import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
-import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
-import org.apache.hadoop.hbase.testclassification.MasterTests;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
-
-@Category({ MasterTests.class, SmallTests.class })
-public class TestForceUpdateProcedure {
-
-  @ClassRule
-  public static final HBaseClassTestRule CLASS_RULE =
-    HBaseClassTestRule.forClass(TestForceUpdateProcedure.class);
-
-  private static HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility();
-
-  private static WALProcedureStore STORE;
-
-  private static ProcedureExecutor<Void> EXEC;
-
-  private static Exchanger<Boolean> EXCHANGER = new Exchanger<>();
-
-  private static int WAL_COUNT = 5;
-
-  private static void createStoreAndExecutor() throws IOException {
-    Path logDir = UTIL.getDataTestDir("proc-wals");
-    STORE = ProcedureTestingUtility.createWalStore(UTIL.getConfiguration(), logDir);
-    STORE.start(1);
-    EXEC = new ProcedureExecutor<Void>(UTIL.getConfiguration(), null, STORE);
-    ProcedureTestingUtility.initAndStartWorkers(EXEC, 1, true);
-  }
-
-  @BeforeClass
-  public static void setUp() throws IOException {
-    UTIL.getConfiguration().setInt(WALProcedureStore.WAL_COUNT_WARN_THRESHOLD_CONF_KEY, WAL_COUNT);
-    createStoreAndExecutor();
-  }
-
-  private static void stopStoreAndExecutor() {
-    EXEC.stop();
-    STORE.stop(false);
-    EXEC = null;
-    STORE = null;
-  }
-
-  @AfterClass
-  public static void tearDown() throws IOException {
-    stopStoreAndExecutor();
-    UTIL.cleanupTestDir();
-  }
-
-  public static final class WaitingProcedure extends Procedure<Void> {
-
-    @Override
-    protected Procedure<Void>[] execute(Void env)
-        throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
-      EXCHANGER.exchange(Boolean.TRUE);
-      setState(ProcedureState.WAITING_TIMEOUT);
-      setTimeout(Integer.MAX_VALUE);
-      throw new ProcedureSuspendedException();
-    }
-
-    @Override
-    protected void rollback(Void env) throws IOException, InterruptedException {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    protected boolean abort(Void env) {
-      return false;
-    }
-
-    @Override
-    protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
-    }
-
-    @Override
-    protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
-    }
-  }
-
-  public static final class ParentProcedure extends Procedure<Void> {
-
-    @SuppressWarnings("unchecked")
-    @Override
-    protected Procedure<Void>[] execute(Void env)
-        throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
-      return new Procedure[] { new DummyProcedure(), new WaitingProcedure() };
-    }
-
-    @Override
-    protected void rollback(Void env) throws IOException, InterruptedException {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    protected boolean abort(Void env) {
-      return false;
-    }
-
-    @Override
-    protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
-    }
-
-    @Override
-    protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
-    }
-  }
-
-  public static final class DummyProcedure extends Procedure<Void> {
-
-    @Override
-    protected Procedure<Void>[] execute(Void env)
-        throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
-      return null;
-    }
-
-    @Override
-    protected void rollback(Void env) throws IOException, InterruptedException {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    protected boolean abort(Void env) {
-      return false;
-    }
-
-    @Override
-    protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
-    }
-
-    @Override
-    protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
-    }
-  }
-
-  public static final class ExchangeProcedure extends Procedure<Void> {
-
-    @SuppressWarnings("unchecked")
-    @Override
-    protected Procedure<Void>[] execute(Void env)
-        throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
-      if (EXCHANGER.exchange(Boolean.TRUE)) {
-        return new Procedure[] { this };
-      } else {
-        return null;
-      }
-    }
-
-    @Override
-    protected void rollback(Void env) throws IOException, InterruptedException {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    protected boolean abort(Void env) {
-      return false;
-    }
-
-    @Override
-    protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
-    }
-
-    @Override
-    protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
-    }
-  }
-
-  @Test
-  public void test() throws IOException, InterruptedException {
-    EXEC.submitProcedure(new ParentProcedure());
-    EXCHANGER.exchange(Boolean.TRUE);
-    UTIL.waitFor(10000, () -> EXEC.getActiveExecutorCount() == 0);
-    // The above operations are used to make sure that we have persist the states of the two
-    // procedures.
-    long procId = EXEC.submitProcedure(new ExchangeProcedure());
-    assertEquals(1, STORE.getActiveLogs().size());
-    for (int i = 0; i < WAL_COUNT - 1; i++) {
-      assertTrue(STORE.rollWriterForTesting());
-      // The WaitinProcedure never gets updated so we can not delete the oldest wal file, so the
-      // number of wal files will increase
-      assertEquals(2 + i, STORE.getActiveLogs().size());
-      EXCHANGER.exchange(Boolean.TRUE);
-      Thread.sleep(1000);
-    }
-    STORE.rollWriterForTesting();
-    // Finish the ExchangeProcedure
-    EXCHANGER.exchange(Boolean.FALSE);
-    // Make sure that we can delete several wal files because we force update the state of
-    // WaitingProcedure. Notice that the last closed wal files can not be deleted, as when rolling
-    // the newest wal file does not have anything in it, and in the closed file we still have the
-    // state for the ExchangeProcedure so it can not be deleted
-    UTIL.waitFor(10000, () -> STORE.getActiveLogs().size() <= 2);
-    UTIL.waitFor(10000, () -> EXEC.isFinished(procId));
-    // Make sure that after the force update we could still load the procedures
-    stopStoreAndExecutor();
-    createStoreAndExecutor();
-    Map<Class<?>, Procedure<Void>> procMap = new HashMap<>();
-    EXEC.getActiveProceduresNoCopy().forEach(p -> procMap.put(p.getClass(), p));
-    assertEquals(3, procMap.size());
-    ParentProcedure parentProc = (ParentProcedure) procMap.get(ParentProcedure.class);
-    assertEquals(ProcedureState.WAITING, parentProc.getState());
-    WaitingProcedure waitingProc = (WaitingProcedure) procMap.get(WaitingProcedure.class);
-    assertEquals(ProcedureState.WAITING_TIMEOUT, waitingProc.getState());
-    DummyProcedure dummyProc = (DummyProcedure) procMap.get(DummyProcedure.class);
-    assertEquals(ProcedureState.SUCCESS, dummyProc.getState());
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/62fe3659/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
index 7f5e11a..3f383d6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
@@ -171,6 +171,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
  * avoiding port contention if another local HBase instance is already running).
  * <p>To preserve test data directories, pass the system property "hbase.testing.preserve.testdir"
  * setting it to true.
+ * For triggering pre commit
  */
 @InterfaceAudience.Public
 @SuppressWarnings("deprecation")