You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by gi...@apache.org on 2018/11/13 14:52:19 UTC

[14/26] hbase-site git commit: Published site at 64c4861272aa03f714b4029ae7725f4286b77062.

http://git-wip-us.apache.org/repos/asf/hbase-site/blob/b9b09fec/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.KeepAliveWorkerThread.html
----------------------------------------------------------------------
diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.KeepAliveWorkerThread.html b/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.KeepAliveWorkerThread.html
index 9fd7e0b..fb34e18 100644
--- a/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.KeepAliveWorkerThread.html
+++ b/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.KeepAliveWorkerThread.html
@@ -1923,188 +1923,193 @@
 <span class="sourceLineNo">1915</span>    return completed.size();<a name="line.1915"></a>
 <span class="sourceLineNo">1916</span>  }<a name="line.1916"></a>
 <span class="sourceLineNo">1917</span><a name="line.1917"></a>
-<span class="sourceLineNo">1918</span>  // ==========================================================================<a name="line.1918"></a>
-<span class="sourceLineNo">1919</span>  //  Worker Thread<a name="line.1919"></a>
-<span class="sourceLineNo">1920</span>  // ==========================================================================<a name="line.1920"></a>
-<span class="sourceLineNo">1921</span>  private class WorkerThread extends StoppableThread {<a name="line.1921"></a>
-<span class="sourceLineNo">1922</span>    private final AtomicLong executionStartTime = new AtomicLong(Long.MAX_VALUE);<a name="line.1922"></a>
-<span class="sourceLineNo">1923</span>    private volatile Procedure&lt;TEnvironment&gt; activeProcedure;<a name="line.1923"></a>
-<span class="sourceLineNo">1924</span><a name="line.1924"></a>
-<span class="sourceLineNo">1925</span>    public WorkerThread(ThreadGroup group) {<a name="line.1925"></a>
-<span class="sourceLineNo">1926</span>      this(group, "PEWorker-");<a name="line.1926"></a>
-<span class="sourceLineNo">1927</span>    }<a name="line.1927"></a>
-<span class="sourceLineNo">1928</span><a name="line.1928"></a>
-<span class="sourceLineNo">1929</span>    protected WorkerThread(ThreadGroup group, String prefix) {<a name="line.1929"></a>
-<span class="sourceLineNo">1930</span>      super(group, prefix + workerId.incrementAndGet());<a name="line.1930"></a>
-<span class="sourceLineNo">1931</span>      setDaemon(true);<a name="line.1931"></a>
+<span class="sourceLineNo">1918</span>  @VisibleForTesting<a name="line.1918"></a>
+<span class="sourceLineNo">1919</span>  public IdLock getProcExecutionLock() {<a name="line.1919"></a>
+<span class="sourceLineNo">1920</span>    return procExecutionLock;<a name="line.1920"></a>
+<span class="sourceLineNo">1921</span>  }<a name="line.1921"></a>
+<span class="sourceLineNo">1922</span><a name="line.1922"></a>
+<span class="sourceLineNo">1923</span>  // ==========================================================================<a name="line.1923"></a>
+<span class="sourceLineNo">1924</span>  //  Worker Thread<a name="line.1924"></a>
+<span class="sourceLineNo">1925</span>  // ==========================================================================<a name="line.1925"></a>
+<span class="sourceLineNo">1926</span>  private class WorkerThread extends StoppableThread {<a name="line.1926"></a>
+<span class="sourceLineNo">1927</span>    private final AtomicLong executionStartTime = new AtomicLong(Long.MAX_VALUE);<a name="line.1927"></a>
+<span class="sourceLineNo">1928</span>    private volatile Procedure&lt;TEnvironment&gt; activeProcedure;<a name="line.1928"></a>
+<span class="sourceLineNo">1929</span><a name="line.1929"></a>
+<span class="sourceLineNo">1930</span>    public WorkerThread(ThreadGroup group) {<a name="line.1930"></a>
+<span class="sourceLineNo">1931</span>      this(group, "PEWorker-");<a name="line.1931"></a>
 <span class="sourceLineNo">1932</span>    }<a name="line.1932"></a>
 <span class="sourceLineNo">1933</span><a name="line.1933"></a>
-<span class="sourceLineNo">1934</span>    @Override<a name="line.1934"></a>
-<span class="sourceLineNo">1935</span>    public void sendStopSignal() {<a name="line.1935"></a>
-<span class="sourceLineNo">1936</span>      scheduler.signalAll();<a name="line.1936"></a>
+<span class="sourceLineNo">1934</span>    protected WorkerThread(ThreadGroup group, String prefix) {<a name="line.1934"></a>
+<span class="sourceLineNo">1935</span>      super(group, prefix + workerId.incrementAndGet());<a name="line.1935"></a>
+<span class="sourceLineNo">1936</span>      setDaemon(true);<a name="line.1936"></a>
 <span class="sourceLineNo">1937</span>    }<a name="line.1937"></a>
-<span class="sourceLineNo">1938</span>    @Override<a name="line.1938"></a>
-<span class="sourceLineNo">1939</span>    public void run() {<a name="line.1939"></a>
-<span class="sourceLineNo">1940</span>      long lastUpdate = EnvironmentEdgeManager.currentTime();<a name="line.1940"></a>
-<span class="sourceLineNo">1941</span>      try {<a name="line.1941"></a>
-<span class="sourceLineNo">1942</span>        while (isRunning() &amp;&amp; keepAlive(lastUpdate)) {<a name="line.1942"></a>
-<span class="sourceLineNo">1943</span>          @SuppressWarnings("unchecked")<a name="line.1943"></a>
-<span class="sourceLineNo">1944</span>          Procedure&lt;TEnvironment&gt; proc = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS);<a name="line.1944"></a>
-<span class="sourceLineNo">1945</span>          if (proc == null) {<a name="line.1945"></a>
-<span class="sourceLineNo">1946</span>            continue;<a name="line.1946"></a>
-<span class="sourceLineNo">1947</span>          }<a name="line.1947"></a>
-<span class="sourceLineNo">1948</span>          this.activeProcedure = proc;<a name="line.1948"></a>
-<span class="sourceLineNo">1949</span>          int activeCount = activeExecutorCount.incrementAndGet();<a name="line.1949"></a>
-<span class="sourceLineNo">1950</span>          int runningCount = store.setRunningProcedureCount(activeCount);<a name="line.1950"></a>
-<span class="sourceLineNo">1951</span>          LOG.trace("Execute pid={} runningCount={}, activeCount={}", proc.getProcId(),<a name="line.1951"></a>
-<span class="sourceLineNo">1952</span>            runningCount, activeCount);<a name="line.1952"></a>
-<span class="sourceLineNo">1953</span>          executionStartTime.set(EnvironmentEdgeManager.currentTime());<a name="line.1953"></a>
-<span class="sourceLineNo">1954</span>          IdLock.Entry lockEntry = procExecutionLock.getLockEntry(proc.getProcId());<a name="line.1954"></a>
-<span class="sourceLineNo">1955</span>          try {<a name="line.1955"></a>
-<span class="sourceLineNo">1956</span>            executeProcedure(proc);<a name="line.1956"></a>
-<span class="sourceLineNo">1957</span>          } catch (AssertionError e) {<a name="line.1957"></a>
-<span class="sourceLineNo">1958</span>            LOG.info("ASSERT pid=" + proc.getProcId(), e);<a name="line.1958"></a>
-<span class="sourceLineNo">1959</span>            throw e;<a name="line.1959"></a>
-<span class="sourceLineNo">1960</span>          } finally {<a name="line.1960"></a>
-<span class="sourceLineNo">1961</span>            procExecutionLock.releaseLockEntry(lockEntry);<a name="line.1961"></a>
-<span class="sourceLineNo">1962</span>            activeCount = activeExecutorCount.decrementAndGet();<a name="line.1962"></a>
-<span class="sourceLineNo">1963</span>            runningCount = store.setRunningProcedureCount(activeCount);<a name="line.1963"></a>
-<span class="sourceLineNo">1964</span>            LOG.trace("Halt pid={} runningCount={}, activeCount={}", proc.getProcId(),<a name="line.1964"></a>
-<span class="sourceLineNo">1965</span>              runningCount, activeCount);<a name="line.1965"></a>
-<span class="sourceLineNo">1966</span>            this.activeProcedure = null;<a name="line.1966"></a>
-<span class="sourceLineNo">1967</span>            lastUpdate = EnvironmentEdgeManager.currentTime();<a name="line.1967"></a>
-<span class="sourceLineNo">1968</span>            executionStartTime.set(Long.MAX_VALUE);<a name="line.1968"></a>
-<span class="sourceLineNo">1969</span>          }<a name="line.1969"></a>
-<span class="sourceLineNo">1970</span>        }<a name="line.1970"></a>
-<span class="sourceLineNo">1971</span>      } catch (Throwable t) {<a name="line.1971"></a>
-<span class="sourceLineNo">1972</span>        LOG.warn("Worker terminating UNNATURALLY {}", this.activeProcedure, t);<a name="line.1972"></a>
-<span class="sourceLineNo">1973</span>      } finally {<a name="line.1973"></a>
-<span class="sourceLineNo">1974</span>        LOG.trace("Worker terminated.");<a name="line.1974"></a>
-<span class="sourceLineNo">1975</span>      }<a name="line.1975"></a>
-<span class="sourceLineNo">1976</span>      workerThreads.remove(this);<a name="line.1976"></a>
-<span class="sourceLineNo">1977</span>    }<a name="line.1977"></a>
-<span class="sourceLineNo">1978</span><a name="line.1978"></a>
-<span class="sourceLineNo">1979</span>    @Override<a name="line.1979"></a>
-<span class="sourceLineNo">1980</span>    public String toString() {<a name="line.1980"></a>
-<span class="sourceLineNo">1981</span>      Procedure&lt;?&gt; p = this.activeProcedure;<a name="line.1981"></a>
-<span class="sourceLineNo">1982</span>      return getName() + "(pid=" + (p == null? Procedure.NO_PROC_ID: p.getProcId() + ")");<a name="line.1982"></a>
-<span class="sourceLineNo">1983</span>    }<a name="line.1983"></a>
-<span class="sourceLineNo">1984</span><a name="line.1984"></a>
-<span class="sourceLineNo">1985</span>    /**<a name="line.1985"></a>
-<span class="sourceLineNo">1986</span>     * @return the time since the current procedure is running<a name="line.1986"></a>
-<span class="sourceLineNo">1987</span>     */<a name="line.1987"></a>
-<span class="sourceLineNo">1988</span>    public long getCurrentRunTime() {<a name="line.1988"></a>
-<span class="sourceLineNo">1989</span>      return EnvironmentEdgeManager.currentTime() - executionStartTime.get();<a name="line.1989"></a>
-<span class="sourceLineNo">1990</span>    }<a name="line.1990"></a>
-<span class="sourceLineNo">1991</span><a name="line.1991"></a>
-<span class="sourceLineNo">1992</span>    // core worker never timeout<a name="line.1992"></a>
-<span class="sourceLineNo">1993</span>    protected boolean keepAlive(long lastUpdate) {<a name="line.1993"></a>
-<span class="sourceLineNo">1994</span>      return true;<a name="line.1994"></a>
+<span class="sourceLineNo">1938</span><a name="line.1938"></a>
+<span class="sourceLineNo">1939</span>    @Override<a name="line.1939"></a>
+<span class="sourceLineNo">1940</span>    public void sendStopSignal() {<a name="line.1940"></a>
+<span class="sourceLineNo">1941</span>      scheduler.signalAll();<a name="line.1941"></a>
+<span class="sourceLineNo">1942</span>    }<a name="line.1942"></a>
+<span class="sourceLineNo">1943</span>    @Override<a name="line.1943"></a>
+<span class="sourceLineNo">1944</span>    public void run() {<a name="line.1944"></a>
+<span class="sourceLineNo">1945</span>      long lastUpdate = EnvironmentEdgeManager.currentTime();<a name="line.1945"></a>
+<span class="sourceLineNo">1946</span>      try {<a name="line.1946"></a>
+<span class="sourceLineNo">1947</span>        while (isRunning() &amp;&amp; keepAlive(lastUpdate)) {<a name="line.1947"></a>
+<span class="sourceLineNo">1948</span>          @SuppressWarnings("unchecked")<a name="line.1948"></a>
+<span class="sourceLineNo">1949</span>          Procedure&lt;TEnvironment&gt; proc = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS);<a name="line.1949"></a>
+<span class="sourceLineNo">1950</span>          if (proc == null) {<a name="line.1950"></a>
+<span class="sourceLineNo">1951</span>            continue;<a name="line.1951"></a>
+<span class="sourceLineNo">1952</span>          }<a name="line.1952"></a>
+<span class="sourceLineNo">1953</span>          this.activeProcedure = proc;<a name="line.1953"></a>
+<span class="sourceLineNo">1954</span>          int activeCount = activeExecutorCount.incrementAndGet();<a name="line.1954"></a>
+<span class="sourceLineNo">1955</span>          int runningCount = store.setRunningProcedureCount(activeCount);<a name="line.1955"></a>
+<span class="sourceLineNo">1956</span>          LOG.trace("Execute pid={} runningCount={}, activeCount={}", proc.getProcId(),<a name="line.1956"></a>
+<span class="sourceLineNo">1957</span>            runningCount, activeCount);<a name="line.1957"></a>
+<span class="sourceLineNo">1958</span>          executionStartTime.set(EnvironmentEdgeManager.currentTime());<a name="line.1958"></a>
+<span class="sourceLineNo">1959</span>          IdLock.Entry lockEntry = procExecutionLock.getLockEntry(proc.getProcId());<a name="line.1959"></a>
+<span class="sourceLineNo">1960</span>          try {<a name="line.1960"></a>
+<span class="sourceLineNo">1961</span>            executeProcedure(proc);<a name="line.1961"></a>
+<span class="sourceLineNo">1962</span>          } catch (AssertionError e) {<a name="line.1962"></a>
+<span class="sourceLineNo">1963</span>            LOG.info("ASSERT pid=" + proc.getProcId(), e);<a name="line.1963"></a>
+<span class="sourceLineNo">1964</span>            throw e;<a name="line.1964"></a>
+<span class="sourceLineNo">1965</span>          } finally {<a name="line.1965"></a>
+<span class="sourceLineNo">1966</span>            procExecutionLock.releaseLockEntry(lockEntry);<a name="line.1966"></a>
+<span class="sourceLineNo">1967</span>            activeCount = activeExecutorCount.decrementAndGet();<a name="line.1967"></a>
+<span class="sourceLineNo">1968</span>            runningCount = store.setRunningProcedureCount(activeCount);<a name="line.1968"></a>
+<span class="sourceLineNo">1969</span>            LOG.trace("Halt pid={} runningCount={}, activeCount={}", proc.getProcId(),<a name="line.1969"></a>
+<span class="sourceLineNo">1970</span>              runningCount, activeCount);<a name="line.1970"></a>
+<span class="sourceLineNo">1971</span>            this.activeProcedure = null;<a name="line.1971"></a>
+<span class="sourceLineNo">1972</span>            lastUpdate = EnvironmentEdgeManager.currentTime();<a name="line.1972"></a>
+<span class="sourceLineNo">1973</span>            executionStartTime.set(Long.MAX_VALUE);<a name="line.1973"></a>
+<span class="sourceLineNo">1974</span>          }<a name="line.1974"></a>
+<span class="sourceLineNo">1975</span>        }<a name="line.1975"></a>
+<span class="sourceLineNo">1976</span>      } catch (Throwable t) {<a name="line.1976"></a>
+<span class="sourceLineNo">1977</span>        LOG.warn("Worker terminating UNNATURALLY {}", this.activeProcedure, t);<a name="line.1977"></a>
+<span class="sourceLineNo">1978</span>      } finally {<a name="line.1978"></a>
+<span class="sourceLineNo">1979</span>        LOG.trace("Worker terminated.");<a name="line.1979"></a>
+<span class="sourceLineNo">1980</span>      }<a name="line.1980"></a>
+<span class="sourceLineNo">1981</span>      workerThreads.remove(this);<a name="line.1981"></a>
+<span class="sourceLineNo">1982</span>    }<a name="line.1982"></a>
+<span class="sourceLineNo">1983</span><a name="line.1983"></a>
+<span class="sourceLineNo">1984</span>    @Override<a name="line.1984"></a>
+<span class="sourceLineNo">1985</span>    public String toString() {<a name="line.1985"></a>
+<span class="sourceLineNo">1986</span>      Procedure&lt;?&gt; p = this.activeProcedure;<a name="line.1986"></a>
+<span class="sourceLineNo">1987</span>      return getName() + "(pid=" + (p == null? Procedure.NO_PROC_ID: p.getProcId() + ")");<a name="line.1987"></a>
+<span class="sourceLineNo">1988</span>    }<a name="line.1988"></a>
+<span class="sourceLineNo">1989</span><a name="line.1989"></a>
+<span class="sourceLineNo">1990</span>    /**<a name="line.1990"></a>
+<span class="sourceLineNo">1991</span>     * @return the time since the current procedure is running<a name="line.1991"></a>
+<span class="sourceLineNo">1992</span>     */<a name="line.1992"></a>
+<span class="sourceLineNo">1993</span>    public long getCurrentRunTime() {<a name="line.1993"></a>
+<span class="sourceLineNo">1994</span>      return EnvironmentEdgeManager.currentTime() - executionStartTime.get();<a name="line.1994"></a>
 <span class="sourceLineNo">1995</span>    }<a name="line.1995"></a>
-<span class="sourceLineNo">1996</span>  }<a name="line.1996"></a>
-<span class="sourceLineNo">1997</span><a name="line.1997"></a>
-<span class="sourceLineNo">1998</span>  // A worker thread which can be added when core workers are stuck. Will timeout after<a name="line.1998"></a>
-<span class="sourceLineNo">1999</span>  // keepAliveTime if there is no procedure to run.<a name="line.1999"></a>
-<span class="sourceLineNo">2000</span>  private final class KeepAliveWorkerThread extends WorkerThread {<a name="line.2000"></a>
-<span class="sourceLineNo">2001</span><a name="line.2001"></a>
-<span class="sourceLineNo">2002</span>    public KeepAliveWorkerThread(ThreadGroup group) {<a name="line.2002"></a>
-<span class="sourceLineNo">2003</span>      super(group, "KeepAlivePEWorker-");<a name="line.2003"></a>
-<span class="sourceLineNo">2004</span>    }<a name="line.2004"></a>
-<span class="sourceLineNo">2005</span><a name="line.2005"></a>
-<span class="sourceLineNo">2006</span>    @Override<a name="line.2006"></a>
-<span class="sourceLineNo">2007</span>    protected boolean keepAlive(long lastUpdate) {<a name="line.2007"></a>
-<span class="sourceLineNo">2008</span>      return EnvironmentEdgeManager.currentTime() - lastUpdate &lt; keepAliveTime;<a name="line.2008"></a>
+<span class="sourceLineNo">1996</span><a name="line.1996"></a>
+<span class="sourceLineNo">1997</span>    // core worker never timeout<a name="line.1997"></a>
+<span class="sourceLineNo">1998</span>    protected boolean keepAlive(long lastUpdate) {<a name="line.1998"></a>
+<span class="sourceLineNo">1999</span>      return true;<a name="line.1999"></a>
+<span class="sourceLineNo">2000</span>    }<a name="line.2000"></a>
+<span class="sourceLineNo">2001</span>  }<a name="line.2001"></a>
+<span class="sourceLineNo">2002</span><a name="line.2002"></a>
+<span class="sourceLineNo">2003</span>  // A worker thread which can be added when core workers are stuck. Will timeout after<a name="line.2003"></a>
+<span class="sourceLineNo">2004</span>  // keepAliveTime if there is no procedure to run.<a name="line.2004"></a>
+<span class="sourceLineNo">2005</span>  private final class KeepAliveWorkerThread extends WorkerThread {<a name="line.2005"></a>
+<span class="sourceLineNo">2006</span><a name="line.2006"></a>
+<span class="sourceLineNo">2007</span>    public KeepAliveWorkerThread(ThreadGroup group) {<a name="line.2007"></a>
+<span class="sourceLineNo">2008</span>      super(group, "KeepAlivePEWorker-");<a name="line.2008"></a>
 <span class="sourceLineNo">2009</span>    }<a name="line.2009"></a>
-<span class="sourceLineNo">2010</span>  }<a name="line.2010"></a>
-<span class="sourceLineNo">2011</span><a name="line.2011"></a>
-<span class="sourceLineNo">2012</span>  // ----------------------------------------------------------------------------<a name="line.2012"></a>
-<span class="sourceLineNo">2013</span>  // TODO-MAYBE: Should we provide a InlineChore to notify the store with the<a name="line.2013"></a>
-<span class="sourceLineNo">2014</span>  // full set of procedures pending and completed to write a compacted<a name="line.2014"></a>
-<span class="sourceLineNo">2015</span>  // version of the log (in case is a log)?<a name="line.2015"></a>
-<span class="sourceLineNo">2016</span>  // In theory no, procedures are have a short life, so at some point the store<a name="line.2016"></a>
-<span class="sourceLineNo">2017</span>  // will have the tracker saying everything is in the last log.<a name="line.2017"></a>
-<span class="sourceLineNo">2018</span>  // ----------------------------------------------------------------------------<a name="line.2018"></a>
-<span class="sourceLineNo">2019</span><a name="line.2019"></a>
-<span class="sourceLineNo">2020</span>  private final class WorkerMonitor extends InlineChore {<a name="line.2020"></a>
-<span class="sourceLineNo">2021</span>    public static final String WORKER_MONITOR_INTERVAL_CONF_KEY =<a name="line.2021"></a>
-<span class="sourceLineNo">2022</span>        "hbase.procedure.worker.monitor.interval.msec";<a name="line.2022"></a>
-<span class="sourceLineNo">2023</span>    private static final int DEFAULT_WORKER_MONITOR_INTERVAL = 5000; // 5sec<a name="line.2023"></a>
+<span class="sourceLineNo">2010</span><a name="line.2010"></a>
+<span class="sourceLineNo">2011</span>    @Override<a name="line.2011"></a>
+<span class="sourceLineNo">2012</span>    protected boolean keepAlive(long lastUpdate) {<a name="line.2012"></a>
+<span class="sourceLineNo">2013</span>      return EnvironmentEdgeManager.currentTime() - lastUpdate &lt; keepAliveTime;<a name="line.2013"></a>
+<span class="sourceLineNo">2014</span>    }<a name="line.2014"></a>
+<span class="sourceLineNo">2015</span>  }<a name="line.2015"></a>
+<span class="sourceLineNo">2016</span><a name="line.2016"></a>
+<span class="sourceLineNo">2017</span>  // ----------------------------------------------------------------------------<a name="line.2017"></a>
+<span class="sourceLineNo">2018</span>  // TODO-MAYBE: Should we provide a InlineChore to notify the store with the<a name="line.2018"></a>
+<span class="sourceLineNo">2019</span>  // full set of procedures pending and completed to write a compacted<a name="line.2019"></a>
+<span class="sourceLineNo">2020</span>  // version of the log (in case is a log)?<a name="line.2020"></a>
+<span class="sourceLineNo">2021</span>  // In theory no, procedures are have a short life, so at some point the store<a name="line.2021"></a>
+<span class="sourceLineNo">2022</span>  // will have the tracker saying everything is in the last log.<a name="line.2022"></a>
+<span class="sourceLineNo">2023</span>  // ----------------------------------------------------------------------------<a name="line.2023"></a>
 <span class="sourceLineNo">2024</span><a name="line.2024"></a>
-<span class="sourceLineNo">2025</span>    public static final String WORKER_STUCK_THRESHOLD_CONF_KEY =<a name="line.2025"></a>
-<span class="sourceLineNo">2026</span>        "hbase.procedure.worker.stuck.threshold.msec";<a name="line.2026"></a>
-<span class="sourceLineNo">2027</span>    private static final int DEFAULT_WORKER_STUCK_THRESHOLD = 10000; // 10sec<a name="line.2027"></a>
-<span class="sourceLineNo">2028</span><a name="line.2028"></a>
-<span class="sourceLineNo">2029</span>    public static final String WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY =<a name="line.2029"></a>
-<span class="sourceLineNo">2030</span>        "hbase.procedure.worker.add.stuck.percentage";<a name="line.2030"></a>
-<span class="sourceLineNo">2031</span>    private static final float DEFAULT_WORKER_ADD_STUCK_PERCENTAGE = 0.5f; // 50% stuck<a name="line.2031"></a>
-<span class="sourceLineNo">2032</span><a name="line.2032"></a>
-<span class="sourceLineNo">2033</span>    private float addWorkerStuckPercentage = DEFAULT_WORKER_ADD_STUCK_PERCENTAGE;<a name="line.2033"></a>
-<span class="sourceLineNo">2034</span>    private int timeoutInterval = DEFAULT_WORKER_MONITOR_INTERVAL;<a name="line.2034"></a>
-<span class="sourceLineNo">2035</span>    private int stuckThreshold = DEFAULT_WORKER_STUCK_THRESHOLD;<a name="line.2035"></a>
-<span class="sourceLineNo">2036</span><a name="line.2036"></a>
-<span class="sourceLineNo">2037</span>    public WorkerMonitor() {<a name="line.2037"></a>
-<span class="sourceLineNo">2038</span>      refreshConfig();<a name="line.2038"></a>
-<span class="sourceLineNo">2039</span>    }<a name="line.2039"></a>
-<span class="sourceLineNo">2040</span><a name="line.2040"></a>
-<span class="sourceLineNo">2041</span>    @Override<a name="line.2041"></a>
-<span class="sourceLineNo">2042</span>    public void run() {<a name="line.2042"></a>
-<span class="sourceLineNo">2043</span>      final int stuckCount = checkForStuckWorkers();<a name="line.2043"></a>
-<span class="sourceLineNo">2044</span>      checkThreadCount(stuckCount);<a name="line.2044"></a>
+<span class="sourceLineNo">2025</span>  private final class WorkerMonitor extends InlineChore {<a name="line.2025"></a>
+<span class="sourceLineNo">2026</span>    public static final String WORKER_MONITOR_INTERVAL_CONF_KEY =<a name="line.2026"></a>
+<span class="sourceLineNo">2027</span>        "hbase.procedure.worker.monitor.interval.msec";<a name="line.2027"></a>
+<span class="sourceLineNo">2028</span>    private static final int DEFAULT_WORKER_MONITOR_INTERVAL = 5000; // 5sec<a name="line.2028"></a>
+<span class="sourceLineNo">2029</span><a name="line.2029"></a>
+<span class="sourceLineNo">2030</span>    public static final String WORKER_STUCK_THRESHOLD_CONF_KEY =<a name="line.2030"></a>
+<span class="sourceLineNo">2031</span>        "hbase.procedure.worker.stuck.threshold.msec";<a name="line.2031"></a>
+<span class="sourceLineNo">2032</span>    private static final int DEFAULT_WORKER_STUCK_THRESHOLD = 10000; // 10sec<a name="line.2032"></a>
+<span class="sourceLineNo">2033</span><a name="line.2033"></a>
+<span class="sourceLineNo">2034</span>    public static final String WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY =<a name="line.2034"></a>
+<span class="sourceLineNo">2035</span>        "hbase.procedure.worker.add.stuck.percentage";<a name="line.2035"></a>
+<span class="sourceLineNo">2036</span>    private static final float DEFAULT_WORKER_ADD_STUCK_PERCENTAGE = 0.5f; // 50% stuck<a name="line.2036"></a>
+<span class="sourceLineNo">2037</span><a name="line.2037"></a>
+<span class="sourceLineNo">2038</span>    private float addWorkerStuckPercentage = DEFAULT_WORKER_ADD_STUCK_PERCENTAGE;<a name="line.2038"></a>
+<span class="sourceLineNo">2039</span>    private int timeoutInterval = DEFAULT_WORKER_MONITOR_INTERVAL;<a name="line.2039"></a>
+<span class="sourceLineNo">2040</span>    private int stuckThreshold = DEFAULT_WORKER_STUCK_THRESHOLD;<a name="line.2040"></a>
+<span class="sourceLineNo">2041</span><a name="line.2041"></a>
+<span class="sourceLineNo">2042</span>    public WorkerMonitor() {<a name="line.2042"></a>
+<span class="sourceLineNo">2043</span>      refreshConfig();<a name="line.2043"></a>
+<span class="sourceLineNo">2044</span>    }<a name="line.2044"></a>
 <span class="sourceLineNo">2045</span><a name="line.2045"></a>
-<span class="sourceLineNo">2046</span>      // refresh interval (poor man dynamic conf update)<a name="line.2046"></a>
-<span class="sourceLineNo">2047</span>      refreshConfig();<a name="line.2047"></a>
-<span class="sourceLineNo">2048</span>    }<a name="line.2048"></a>
-<span class="sourceLineNo">2049</span><a name="line.2049"></a>
-<span class="sourceLineNo">2050</span>    private int checkForStuckWorkers() {<a name="line.2050"></a>
-<span class="sourceLineNo">2051</span>      // check if any of the worker is stuck<a name="line.2051"></a>
-<span class="sourceLineNo">2052</span>      int stuckCount = 0;<a name="line.2052"></a>
-<span class="sourceLineNo">2053</span>      for (WorkerThread worker : workerThreads) {<a name="line.2053"></a>
-<span class="sourceLineNo">2054</span>        if (worker.getCurrentRunTime() &lt; stuckThreshold) {<a name="line.2054"></a>
-<span class="sourceLineNo">2055</span>          continue;<a name="line.2055"></a>
-<span class="sourceLineNo">2056</span>        }<a name="line.2056"></a>
-<span class="sourceLineNo">2057</span><a name="line.2057"></a>
-<span class="sourceLineNo">2058</span>        // WARN the worker is stuck<a name="line.2058"></a>
-<span class="sourceLineNo">2059</span>        stuckCount++;<a name="line.2059"></a>
-<span class="sourceLineNo">2060</span>        LOG.warn("Worker stuck {}, run time {}", worker,<a name="line.2060"></a>
-<span class="sourceLineNo">2061</span>          StringUtils.humanTimeDiff(worker.getCurrentRunTime()));<a name="line.2061"></a>
-<span class="sourceLineNo">2062</span>      }<a name="line.2062"></a>
-<span class="sourceLineNo">2063</span>      return stuckCount;<a name="line.2063"></a>
-<span class="sourceLineNo">2064</span>    }<a name="line.2064"></a>
-<span class="sourceLineNo">2065</span><a name="line.2065"></a>
-<span class="sourceLineNo">2066</span>    private void checkThreadCount(final int stuckCount) {<a name="line.2066"></a>
-<span class="sourceLineNo">2067</span>      // nothing to do if there are no runnable tasks<a name="line.2067"></a>
-<span class="sourceLineNo">2068</span>      if (stuckCount &lt; 1 || !scheduler.hasRunnables()) {<a name="line.2068"></a>
-<span class="sourceLineNo">2069</span>        return;<a name="line.2069"></a>
-<span class="sourceLineNo">2070</span>      }<a name="line.2070"></a>
-<span class="sourceLineNo">2071</span><a name="line.2071"></a>
-<span class="sourceLineNo">2072</span>      // add a new thread if the worker stuck percentage exceed the threshold limit<a name="line.2072"></a>
-<span class="sourceLineNo">2073</span>      // and every handler is active.<a name="line.2073"></a>
-<span class="sourceLineNo">2074</span>      final float stuckPerc = ((float) stuckCount) / workerThreads.size();<a name="line.2074"></a>
-<span class="sourceLineNo">2075</span>      // let's add new worker thread more aggressively, as they will timeout finally if there is no<a name="line.2075"></a>
-<span class="sourceLineNo">2076</span>      // work to do.<a name="line.2076"></a>
-<span class="sourceLineNo">2077</span>      if (stuckPerc &gt;= addWorkerStuckPercentage &amp;&amp; workerThreads.size() &lt; maxPoolSize) {<a name="line.2077"></a>
-<span class="sourceLineNo">2078</span>        final KeepAliveWorkerThread worker = new KeepAliveWorkerThread(threadGroup);<a name="line.2078"></a>
-<span class="sourceLineNo">2079</span>        workerThreads.add(worker);<a name="line.2079"></a>
-<span class="sourceLineNo">2080</span>        worker.start();<a name="line.2080"></a>
-<span class="sourceLineNo">2081</span>        LOG.debug("Added new worker thread {}", worker);<a name="line.2081"></a>
-<span class="sourceLineNo">2082</span>      }<a name="line.2082"></a>
-<span class="sourceLineNo">2083</span>    }<a name="line.2083"></a>
-<span class="sourceLineNo">2084</span><a name="line.2084"></a>
-<span class="sourceLineNo">2085</span>    private void refreshConfig() {<a name="line.2085"></a>
-<span class="sourceLineNo">2086</span>      addWorkerStuckPercentage = conf.getFloat(WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY,<a name="line.2086"></a>
-<span class="sourceLineNo">2087</span>          DEFAULT_WORKER_ADD_STUCK_PERCENTAGE);<a name="line.2087"></a>
-<span class="sourceLineNo">2088</span>      timeoutInterval = conf.getInt(WORKER_MONITOR_INTERVAL_CONF_KEY,<a name="line.2088"></a>
-<span class="sourceLineNo">2089</span>        DEFAULT_WORKER_MONITOR_INTERVAL);<a name="line.2089"></a>
-<span class="sourceLineNo">2090</span>      stuckThreshold = conf.getInt(WORKER_STUCK_THRESHOLD_CONF_KEY,<a name="line.2090"></a>
-<span class="sourceLineNo">2091</span>        DEFAULT_WORKER_STUCK_THRESHOLD);<a name="line.2091"></a>
-<span class="sourceLineNo">2092</span>    }<a name="line.2092"></a>
-<span class="sourceLineNo">2093</span><a name="line.2093"></a>
-<span class="sourceLineNo">2094</span>    @Override<a name="line.2094"></a>
-<span class="sourceLineNo">2095</span>    public int getTimeoutInterval() {<a name="line.2095"></a>
-<span class="sourceLineNo">2096</span>      return timeoutInterval;<a name="line.2096"></a>
+<span class="sourceLineNo">2046</span>    @Override<a name="line.2046"></a>
+<span class="sourceLineNo">2047</span>    public void run() {<a name="line.2047"></a>
+<span class="sourceLineNo">2048</span>      final int stuckCount = checkForStuckWorkers();<a name="line.2048"></a>
+<span class="sourceLineNo">2049</span>      checkThreadCount(stuckCount);<a name="line.2049"></a>
+<span class="sourceLineNo">2050</span><a name="line.2050"></a>
+<span class="sourceLineNo">2051</span>      // refresh interval (poor man dynamic conf update)<a name="line.2051"></a>
+<span class="sourceLineNo">2052</span>      refreshConfig();<a name="line.2052"></a>
+<span class="sourceLineNo">2053</span>    }<a name="line.2053"></a>
+<span class="sourceLineNo">2054</span><a name="line.2054"></a>
+<span class="sourceLineNo">2055</span>    private int checkForStuckWorkers() {<a name="line.2055"></a>
+<span class="sourceLineNo">2056</span>      // check if any of the worker is stuck<a name="line.2056"></a>
+<span class="sourceLineNo">2057</span>      int stuckCount = 0;<a name="line.2057"></a>
+<span class="sourceLineNo">2058</span>      for (WorkerThread worker : workerThreads) {<a name="line.2058"></a>
+<span class="sourceLineNo">2059</span>        if (worker.getCurrentRunTime() &lt; stuckThreshold) {<a name="line.2059"></a>
+<span class="sourceLineNo">2060</span>          continue;<a name="line.2060"></a>
+<span class="sourceLineNo">2061</span>        }<a name="line.2061"></a>
+<span class="sourceLineNo">2062</span><a name="line.2062"></a>
+<span class="sourceLineNo">2063</span>        // WARN the worker is stuck<a name="line.2063"></a>
+<span class="sourceLineNo">2064</span>        stuckCount++;<a name="line.2064"></a>
+<span class="sourceLineNo">2065</span>        LOG.warn("Worker stuck {}, run time {}", worker,<a name="line.2065"></a>
+<span class="sourceLineNo">2066</span>          StringUtils.humanTimeDiff(worker.getCurrentRunTime()));<a name="line.2066"></a>
+<span class="sourceLineNo">2067</span>      }<a name="line.2067"></a>
+<span class="sourceLineNo">2068</span>      return stuckCount;<a name="line.2068"></a>
+<span class="sourceLineNo">2069</span>    }<a name="line.2069"></a>
+<span class="sourceLineNo">2070</span><a name="line.2070"></a>
+<span class="sourceLineNo">2071</span>    private void checkThreadCount(final int stuckCount) {<a name="line.2071"></a>
+<span class="sourceLineNo">2072</span>      // nothing to do if there are no runnable tasks<a name="line.2072"></a>
+<span class="sourceLineNo">2073</span>      if (stuckCount &lt; 1 || !scheduler.hasRunnables()) {<a name="line.2073"></a>
+<span class="sourceLineNo">2074</span>        return;<a name="line.2074"></a>
+<span class="sourceLineNo">2075</span>      }<a name="line.2075"></a>
+<span class="sourceLineNo">2076</span><a name="line.2076"></a>
+<span class="sourceLineNo">2077</span>      // add a new thread if the worker stuck percentage exceed the threshold limit<a name="line.2077"></a>
+<span class="sourceLineNo">2078</span>      // and every handler is active.<a name="line.2078"></a>
+<span class="sourceLineNo">2079</span>      final float stuckPerc = ((float) stuckCount) / workerThreads.size();<a name="line.2079"></a>
+<span class="sourceLineNo">2080</span>      // let's add new worker thread more aggressively, as they will timeout finally if there is no<a name="line.2080"></a>
+<span class="sourceLineNo">2081</span>      // work to do.<a name="line.2081"></a>
+<span class="sourceLineNo">2082</span>      if (stuckPerc &gt;= addWorkerStuckPercentage &amp;&amp; workerThreads.size() &lt; maxPoolSize) {<a name="line.2082"></a>
+<span class="sourceLineNo">2083</span>        final KeepAliveWorkerThread worker = new KeepAliveWorkerThread(threadGroup);<a name="line.2083"></a>
+<span class="sourceLineNo">2084</span>        workerThreads.add(worker);<a name="line.2084"></a>
+<span class="sourceLineNo">2085</span>        worker.start();<a name="line.2085"></a>
+<span class="sourceLineNo">2086</span>        LOG.debug("Added new worker thread {}", worker);<a name="line.2086"></a>
+<span class="sourceLineNo">2087</span>      }<a name="line.2087"></a>
+<span class="sourceLineNo">2088</span>    }<a name="line.2088"></a>
+<span class="sourceLineNo">2089</span><a name="line.2089"></a>
+<span class="sourceLineNo">2090</span>    private void refreshConfig() {<a name="line.2090"></a>
+<span class="sourceLineNo">2091</span>      addWorkerStuckPercentage = conf.getFloat(WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY,<a name="line.2091"></a>
+<span class="sourceLineNo">2092</span>          DEFAULT_WORKER_ADD_STUCK_PERCENTAGE);<a name="line.2092"></a>
+<span class="sourceLineNo">2093</span>      timeoutInterval = conf.getInt(WORKER_MONITOR_INTERVAL_CONF_KEY,<a name="line.2093"></a>
+<span class="sourceLineNo">2094</span>        DEFAULT_WORKER_MONITOR_INTERVAL);<a name="line.2094"></a>
+<span class="sourceLineNo">2095</span>      stuckThreshold = conf.getInt(WORKER_STUCK_THRESHOLD_CONF_KEY,<a name="line.2095"></a>
+<span class="sourceLineNo">2096</span>        DEFAULT_WORKER_STUCK_THRESHOLD);<a name="line.2096"></a>
 <span class="sourceLineNo">2097</span>    }<a name="line.2097"></a>
-<span class="sourceLineNo">2098</span>  }<a name="line.2098"></a>
-<span class="sourceLineNo">2099</span>}<a name="line.2099"></a>
+<span class="sourceLineNo">2098</span><a name="line.2098"></a>
+<span class="sourceLineNo">2099</span>    @Override<a name="line.2099"></a>
+<span class="sourceLineNo">2100</span>    public int getTimeoutInterval() {<a name="line.2100"></a>
+<span class="sourceLineNo">2101</span>      return timeoutInterval;<a name="line.2101"></a>
+<span class="sourceLineNo">2102</span>    }<a name="line.2102"></a>
+<span class="sourceLineNo">2103</span>  }<a name="line.2103"></a>
+<span class="sourceLineNo">2104</span>}<a name="line.2104"></a>
 
 
 

http://git-wip-us.apache.org/repos/asf/hbase-site/blob/b9b09fec/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.ProcedureExecutorListener.html
----------------------------------------------------------------------
diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.ProcedureExecutorListener.html b/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.ProcedureExecutorListener.html
index 9fd7e0b..fb34e18 100644
--- a/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.ProcedureExecutorListener.html
+++ b/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.ProcedureExecutorListener.html
@@ -1923,188 +1923,193 @@
 <span class="sourceLineNo">1915</span>    return completed.size();<a name="line.1915"></a>
 <span class="sourceLineNo">1916</span>  }<a name="line.1916"></a>
 <span class="sourceLineNo">1917</span><a name="line.1917"></a>
-<span class="sourceLineNo">1918</span>  // ==========================================================================<a name="line.1918"></a>
-<span class="sourceLineNo">1919</span>  //  Worker Thread<a name="line.1919"></a>
-<span class="sourceLineNo">1920</span>  // ==========================================================================<a name="line.1920"></a>
-<span class="sourceLineNo">1921</span>  private class WorkerThread extends StoppableThread {<a name="line.1921"></a>
-<span class="sourceLineNo">1922</span>    private final AtomicLong executionStartTime = new AtomicLong(Long.MAX_VALUE);<a name="line.1922"></a>
-<span class="sourceLineNo">1923</span>    private volatile Procedure&lt;TEnvironment&gt; activeProcedure;<a name="line.1923"></a>
-<span class="sourceLineNo">1924</span><a name="line.1924"></a>
-<span class="sourceLineNo">1925</span>    public WorkerThread(ThreadGroup group) {<a name="line.1925"></a>
-<span class="sourceLineNo">1926</span>      this(group, "PEWorker-");<a name="line.1926"></a>
-<span class="sourceLineNo">1927</span>    }<a name="line.1927"></a>
-<span class="sourceLineNo">1928</span><a name="line.1928"></a>
-<span class="sourceLineNo">1929</span>    protected WorkerThread(ThreadGroup group, String prefix) {<a name="line.1929"></a>
-<span class="sourceLineNo">1930</span>      super(group, prefix + workerId.incrementAndGet());<a name="line.1930"></a>
-<span class="sourceLineNo">1931</span>      setDaemon(true);<a name="line.1931"></a>
+<span class="sourceLineNo">1918</span>  @VisibleForTesting<a name="line.1918"></a>
+<span class="sourceLineNo">1919</span>  public IdLock getProcExecutionLock() {<a name="line.1919"></a>
+<span class="sourceLineNo">1920</span>    return procExecutionLock;<a name="line.1920"></a>
+<span class="sourceLineNo">1921</span>  }<a name="line.1921"></a>
+<span class="sourceLineNo">1922</span><a name="line.1922"></a>
+<span class="sourceLineNo">1923</span>  // ==========================================================================<a name="line.1923"></a>
+<span class="sourceLineNo">1924</span>  //  Worker Thread<a name="line.1924"></a>
+<span class="sourceLineNo">1925</span>  // ==========================================================================<a name="line.1925"></a>
+<span class="sourceLineNo">1926</span>  private class WorkerThread extends StoppableThread {<a name="line.1926"></a>
+<span class="sourceLineNo">1927</span>    private final AtomicLong executionStartTime = new AtomicLong(Long.MAX_VALUE);<a name="line.1927"></a>
+<span class="sourceLineNo">1928</span>    private volatile Procedure&lt;TEnvironment&gt; activeProcedure;<a name="line.1928"></a>
+<span class="sourceLineNo">1929</span><a name="line.1929"></a>
+<span class="sourceLineNo">1930</span>    public WorkerThread(ThreadGroup group) {<a name="line.1930"></a>
+<span class="sourceLineNo">1931</span>      this(group, "PEWorker-");<a name="line.1931"></a>
 <span class="sourceLineNo">1932</span>    }<a name="line.1932"></a>
 <span class="sourceLineNo">1933</span><a name="line.1933"></a>
-<span class="sourceLineNo">1934</span>    @Override<a name="line.1934"></a>
-<span class="sourceLineNo">1935</span>    public void sendStopSignal() {<a name="line.1935"></a>
-<span class="sourceLineNo">1936</span>      scheduler.signalAll();<a name="line.1936"></a>
+<span class="sourceLineNo">1934</span>    protected WorkerThread(ThreadGroup group, String prefix) {<a name="line.1934"></a>
+<span class="sourceLineNo">1935</span>      super(group, prefix + workerId.incrementAndGet());<a name="line.1935"></a>
+<span class="sourceLineNo">1936</span>      setDaemon(true);<a name="line.1936"></a>
 <span class="sourceLineNo">1937</span>    }<a name="line.1937"></a>
-<span class="sourceLineNo">1938</span>    @Override<a name="line.1938"></a>
-<span class="sourceLineNo">1939</span>    public void run() {<a name="line.1939"></a>
-<span class="sourceLineNo">1940</span>      long lastUpdate = EnvironmentEdgeManager.currentTime();<a name="line.1940"></a>
-<span class="sourceLineNo">1941</span>      try {<a name="line.1941"></a>
-<span class="sourceLineNo">1942</span>        while (isRunning() &amp;&amp; keepAlive(lastUpdate)) {<a name="line.1942"></a>
-<span class="sourceLineNo">1943</span>          @SuppressWarnings("unchecked")<a name="line.1943"></a>
-<span class="sourceLineNo">1944</span>          Procedure&lt;TEnvironment&gt; proc = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS);<a name="line.1944"></a>
-<span class="sourceLineNo">1945</span>          if (proc == null) {<a name="line.1945"></a>
-<span class="sourceLineNo">1946</span>            continue;<a name="line.1946"></a>
-<span class="sourceLineNo">1947</span>          }<a name="line.1947"></a>
-<span class="sourceLineNo">1948</span>          this.activeProcedure = proc;<a name="line.1948"></a>
-<span class="sourceLineNo">1949</span>          int activeCount = activeExecutorCount.incrementAndGet();<a name="line.1949"></a>
-<span class="sourceLineNo">1950</span>          int runningCount = store.setRunningProcedureCount(activeCount);<a name="line.1950"></a>
-<span class="sourceLineNo">1951</span>          LOG.trace("Execute pid={} runningCount={}, activeCount={}", proc.getProcId(),<a name="line.1951"></a>
-<span class="sourceLineNo">1952</span>            runningCount, activeCount);<a name="line.1952"></a>
-<span class="sourceLineNo">1953</span>          executionStartTime.set(EnvironmentEdgeManager.currentTime());<a name="line.1953"></a>
-<span class="sourceLineNo">1954</span>          IdLock.Entry lockEntry = procExecutionLock.getLockEntry(proc.getProcId());<a name="line.1954"></a>
-<span class="sourceLineNo">1955</span>          try {<a name="line.1955"></a>
-<span class="sourceLineNo">1956</span>            executeProcedure(proc);<a name="line.1956"></a>
-<span class="sourceLineNo">1957</span>          } catch (AssertionError e) {<a name="line.1957"></a>
-<span class="sourceLineNo">1958</span>            LOG.info("ASSERT pid=" + proc.getProcId(), e);<a name="line.1958"></a>
-<span class="sourceLineNo">1959</span>            throw e;<a name="line.1959"></a>
-<span class="sourceLineNo">1960</span>          } finally {<a name="line.1960"></a>
-<span class="sourceLineNo">1961</span>            procExecutionLock.releaseLockEntry(lockEntry);<a name="line.1961"></a>
-<span class="sourceLineNo">1962</span>            activeCount = activeExecutorCount.decrementAndGet();<a name="line.1962"></a>
-<span class="sourceLineNo">1963</span>            runningCount = store.setRunningProcedureCount(activeCount);<a name="line.1963"></a>
-<span class="sourceLineNo">1964</span>            LOG.trace("Halt pid={} runningCount={}, activeCount={}", proc.getProcId(),<a name="line.1964"></a>
-<span class="sourceLineNo">1965</span>              runningCount, activeCount);<a name="line.1965"></a>
-<span class="sourceLineNo">1966</span>            this.activeProcedure = null;<a name="line.1966"></a>
-<span class="sourceLineNo">1967</span>            lastUpdate = EnvironmentEdgeManager.currentTime();<a name="line.1967"></a>
-<span class="sourceLineNo">1968</span>            executionStartTime.set(Long.MAX_VALUE);<a name="line.1968"></a>
-<span class="sourceLineNo">1969</span>          }<a name="line.1969"></a>
-<span class="sourceLineNo">1970</span>        }<a name="line.1970"></a>
-<span class="sourceLineNo">1971</span>      } catch (Throwable t) {<a name="line.1971"></a>
-<span class="sourceLineNo">1972</span>        LOG.warn("Worker terminating UNNATURALLY {}", this.activeProcedure, t);<a name="line.1972"></a>
-<span class="sourceLineNo">1973</span>      } finally {<a name="line.1973"></a>
-<span class="sourceLineNo">1974</span>        LOG.trace("Worker terminated.");<a name="line.1974"></a>
-<span class="sourceLineNo">1975</span>      }<a name="line.1975"></a>
-<span class="sourceLineNo">1976</span>      workerThreads.remove(this);<a name="line.1976"></a>
-<span class="sourceLineNo">1977</span>    }<a name="line.1977"></a>
-<span class="sourceLineNo">1978</span><a name="line.1978"></a>
-<span class="sourceLineNo">1979</span>    @Override<a name="line.1979"></a>
-<span class="sourceLineNo">1980</span>    public String toString() {<a name="line.1980"></a>
-<span class="sourceLineNo">1981</span>      Procedure&lt;?&gt; p = this.activeProcedure;<a name="line.1981"></a>
-<span class="sourceLineNo">1982</span>      return getName() + "(pid=" + (p == null? Procedure.NO_PROC_ID: p.getProcId() + ")");<a name="line.1982"></a>
-<span class="sourceLineNo">1983</span>    }<a name="line.1983"></a>
-<span class="sourceLineNo">1984</span><a name="line.1984"></a>
-<span class="sourceLineNo">1985</span>    /**<a name="line.1985"></a>
-<span class="sourceLineNo">1986</span>     * @return the time since the current procedure is running<a name="line.1986"></a>
-<span class="sourceLineNo">1987</span>     */<a name="line.1987"></a>
-<span class="sourceLineNo">1988</span>    public long getCurrentRunTime() {<a name="line.1988"></a>
-<span class="sourceLineNo">1989</span>      return EnvironmentEdgeManager.currentTime() - executionStartTime.get();<a name="line.1989"></a>
-<span class="sourceLineNo">1990</span>    }<a name="line.1990"></a>
-<span class="sourceLineNo">1991</span><a name="line.1991"></a>
-<span class="sourceLineNo">1992</span>    // core worker never timeout<a name="line.1992"></a>
-<span class="sourceLineNo">1993</span>    protected boolean keepAlive(long lastUpdate) {<a name="line.1993"></a>
-<span class="sourceLineNo">1994</span>      return true;<a name="line.1994"></a>
+<span class="sourceLineNo">1938</span><a name="line.1938"></a>
+<span class="sourceLineNo">1939</span>    @Override<a name="line.1939"></a>
+<span class="sourceLineNo">1940</span>    public void sendStopSignal() {<a name="line.1940"></a>
+<span class="sourceLineNo">1941</span>      scheduler.signalAll();<a name="line.1941"></a>
+<span class="sourceLineNo">1942</span>    }<a name="line.1942"></a>
+<span class="sourceLineNo">1943</span>    @Override<a name="line.1943"></a>
+<span class="sourceLineNo">1944</span>    public void run() {<a name="line.1944"></a>
+<span class="sourceLineNo">1945</span>      long lastUpdate = EnvironmentEdgeManager.currentTime();<a name="line.1945"></a>
+<span class="sourceLineNo">1946</span>      try {<a name="line.1946"></a>
+<span class="sourceLineNo">1947</span>        while (isRunning() &amp;&amp; keepAlive(lastUpdate)) {<a name="line.1947"></a>
+<span class="sourceLineNo">1948</span>          @SuppressWarnings("unchecked")<a name="line.1948"></a>
+<span class="sourceLineNo">1949</span>          Procedure&lt;TEnvironment&gt; proc = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS);<a name="line.1949"></a>
+<span class="sourceLineNo">1950</span>          if (proc == null) {<a name="line.1950"></a>
+<span class="sourceLineNo">1951</span>            continue;<a name="line.1951"></a>
+<span class="sourceLineNo">1952</span>          }<a name="line.1952"></a>
+<span class="sourceLineNo">1953</span>          this.activeProcedure = proc;<a name="line.1953"></a>
+<span class="sourceLineNo">1954</span>          int activeCount = activeExecutorCount.incrementAndGet();<a name="line.1954"></a>
+<span class="sourceLineNo">1955</span>          int runningCount = store.setRunningProcedureCount(activeCount);<a name="line.1955"></a>
+<span class="sourceLineNo">1956</span>          LOG.trace("Execute pid={} runningCount={}, activeCount={}", proc.getProcId(),<a name="line.1956"></a>
+<span class="sourceLineNo">1957</span>            runningCount, activeCount);<a name="line.1957"></a>
+<span class="sourceLineNo">1958</span>          executionStartTime.set(EnvironmentEdgeManager.currentTime());<a name="line.1958"></a>
+<span class="sourceLineNo">1959</span>          IdLock.Entry lockEntry = procExecutionLock.getLockEntry(proc.getProcId());<a name="line.1959"></a>
+<span class="sourceLineNo">1960</span>          try {<a name="line.1960"></a>
+<span class="sourceLineNo">1961</span>            executeProcedure(proc);<a name="line.1961"></a>
+<span class="sourceLineNo">1962</span>          } catch (AssertionError e) {<a name="line.1962"></a>
+<span class="sourceLineNo">1963</span>            LOG.info("ASSERT pid=" + proc.getProcId(), e);<a name="line.1963"></a>
+<span class="sourceLineNo">1964</span>            throw e;<a name="line.1964"></a>
+<span class="sourceLineNo">1965</span>          } finally {<a name="line.1965"></a>
+<span class="sourceLineNo">1966</span>            procExecutionLock.releaseLockEntry(lockEntry);<a name="line.1966"></a>
+<span class="sourceLineNo">1967</span>            activeCount = activeExecutorCount.decrementAndGet();<a name="line.1967"></a>
+<span class="sourceLineNo">1968</span>            runningCount = store.setRunningProcedureCount(activeCount);<a name="line.1968"></a>
+<span class="sourceLineNo">1969</span>            LOG.trace("Halt pid={} runningCount={}, activeCount={}", proc.getProcId(),<a name="line.1969"></a>
+<span class="sourceLineNo">1970</span>              runningCount, activeCount);<a name="line.1970"></a>
+<span class="sourceLineNo">1971</span>            this.activeProcedure = null;<a name="line.1971"></a>
+<span class="sourceLineNo">1972</span>            lastUpdate = EnvironmentEdgeManager.currentTime();<a name="line.1972"></a>
+<span class="sourceLineNo">1973</span>            executionStartTime.set(Long.MAX_VALUE);<a name="line.1973"></a>
+<span class="sourceLineNo">1974</span>          }<a name="line.1974"></a>
+<span class="sourceLineNo">1975</span>        }<a name="line.1975"></a>
+<span class="sourceLineNo">1976</span>      } catch (Throwable t) {<a name="line.1976"></a>
+<span class="sourceLineNo">1977</span>        LOG.warn("Worker terminating UNNATURALLY {}", this.activeProcedure, t);<a name="line.1977"></a>
+<span class="sourceLineNo">1978</span>      } finally {<a name="line.1978"></a>
+<span class="sourceLineNo">1979</span>        LOG.trace("Worker terminated.");<a name="line.1979"></a>
+<span class="sourceLineNo">1980</span>      }<a name="line.1980"></a>
+<span class="sourceLineNo">1981</span>      workerThreads.remove(this);<a name="line.1981"></a>
+<span class="sourceLineNo">1982</span>    }<a name="line.1982"></a>
+<span class="sourceLineNo">1983</span><a name="line.1983"></a>
+<span class="sourceLineNo">1984</span>    @Override<a name="line.1984"></a>
+<span class="sourceLineNo">1985</span>    public String toString() {<a name="line.1985"></a>
+<span class="sourceLineNo">1986</span>      Procedure&lt;?&gt; p = this.activeProcedure;<a name="line.1986"></a>
+<span class="sourceLineNo">1987</span>      return getName() + "(pid=" + (p == null? Procedure.NO_PROC_ID: p.getProcId() + ")");<a name="line.1987"></a>
+<span class="sourceLineNo">1988</span>    }<a name="line.1988"></a>
+<span class="sourceLineNo">1989</span><a name="line.1989"></a>
+<span class="sourceLineNo">1990</span>    /**<a name="line.1990"></a>
+<span class="sourceLineNo">1991</span>     * @return the time since the current procedure is running<a name="line.1991"></a>
+<span class="sourceLineNo">1992</span>     */<a name="line.1992"></a>
+<span class="sourceLineNo">1993</span>    public long getCurrentRunTime() {<a name="line.1993"></a>
+<span class="sourceLineNo">1994</span>      return EnvironmentEdgeManager.currentTime() - executionStartTime.get();<a name="line.1994"></a>
 <span class="sourceLineNo">1995</span>    }<a name="line.1995"></a>
-<span class="sourceLineNo">1996</span>  }<a name="line.1996"></a>
-<span class="sourceLineNo">1997</span><a name="line.1997"></a>
-<span class="sourceLineNo">1998</span>  // A worker thread which can be added when core workers are stuck. Will timeout after<a name="line.1998"></a>
-<span class="sourceLineNo">1999</span>  // keepAliveTime if there is no procedure to run.<a name="line.1999"></a>
-<span class="sourceLineNo">2000</span>  private final class KeepAliveWorkerThread extends WorkerThread {<a name="line.2000"></a>
-<span class="sourceLineNo">2001</span><a name="line.2001"></a>
-<span class="sourceLineNo">2002</span>    public KeepAliveWorkerThread(ThreadGroup group) {<a name="line.2002"></a>
-<span class="sourceLineNo">2003</span>      super(group, "KeepAlivePEWorker-");<a name="line.2003"></a>
-<span class="sourceLineNo">2004</span>    }<a name="line.2004"></a>
-<span class="sourceLineNo">2005</span><a name="line.2005"></a>
-<span class="sourceLineNo">2006</span>    @Override<a name="line.2006"></a>
-<span class="sourceLineNo">2007</span>    protected boolean keepAlive(long lastUpdate) {<a name="line.2007"></a>
-<span class="sourceLineNo">2008</span>      return EnvironmentEdgeManager.currentTime() - lastUpdate &lt; keepAliveTime;<a name="line.2008"></a>
+<span class="sourceLineNo">1996</span><a name="line.1996"></a>
+<span class="sourceLineNo">1997</span>    // core worker never timeout<a name="line.1997"></a>
+<span class="sourceLineNo">1998</span>    protected boolean keepAlive(long lastUpdate) {<a name="line.1998"></a>
+<span class="sourceLineNo">1999</span>      return true;<a name="line.1999"></a>
+<span class="sourceLineNo">2000</span>    }<a name="line.2000"></a>
+<span class="sourceLineNo">2001</span>  }<a name="line.2001"></a>
+<span class="sourceLineNo">2002</span><a name="line.2002"></a>
+<span class="sourceLineNo">2003</span>  // A worker thread which can be added when core workers are stuck. Will timeout after<a name="line.2003"></a>
+<span class="sourceLineNo">2004</span>  // keepAliveTime if there is no procedure to run.<a name="line.2004"></a>
+<span class="sourceLineNo">2005</span>  private final class KeepAliveWorkerThread extends WorkerThread {<a name="line.2005"></a>
+<span class="sourceLineNo">2006</span><a name="line.2006"></a>
+<span class="sourceLineNo">2007</span>    public KeepAliveWorkerThread(ThreadGroup group) {<a name="line.2007"></a>
+<span class="sourceLineNo">2008</span>      super(group, "KeepAlivePEWorker-");<a name="line.2008"></a>
 <span class="sourceLineNo">2009</span>    }<a name="line.2009"></a>
-<span class="sourceLineNo">2010</span>  }<a name="line.2010"></a>
-<span class="sourceLineNo">2011</span><a name="line.2011"></a>
-<span class="sourceLineNo">2012</span>  // ----------------------------------------------------------------------------<a name="line.2012"></a>
-<span class="sourceLineNo">2013</span>  // TODO-MAYBE: Should we provide a InlineChore to notify the store with the<a name="line.2013"></a>
-<span class="sourceLineNo">2014</span>  // full set of procedures pending and completed to write a compacted<a name="line.2014"></a>
-<span class="sourceLineNo">2015</span>  // version of the log (in case is a log)?<a name="line.2015"></a>
-<span class="sourceLineNo">2016</span>  // In theory no, procedures are have a short life, so at some point the store<a name="line.2016"></a>
-<span class="sourceLineNo">2017</span>  // will have the tracker saying everything is in the last log.<a name="line.2017"></a>
-<span class="sourceLineNo">2018</span>  // ----------------------------------------------------------------------------<a name="line.2018"></a>
-<span class="sourceLineNo">2019</span><a name="line.2019"></a>
-<span class="sourceLineNo">2020</span>  private final class WorkerMonitor extends InlineChore {<a name="line.2020"></a>
-<span class="sourceLineNo">2021</span>    public static final String WORKER_MONITOR_INTERVAL_CONF_KEY =<a name="line.2021"></a>
-<span class="sourceLineNo">2022</span>        "hbase.procedure.worker.monitor.interval.msec";<a name="line.2022"></a>
-<span class="sourceLineNo">2023</span>    private static final int DEFAULT_WORKER_MONITOR_INTERVAL = 5000; // 5sec<a name="line.2023"></a>
+<span class="sourceLineNo">2010</span><a name="line.2010"></a>
+<span class="sourceLineNo">2011</span>    @Override<a name="line.2011"></a>
+<span class="sourceLineNo">2012</span>    protected boolean keepAlive(long lastUpdate) {<a name="line.2012"></a>
+<span class="sourceLineNo">2013</span>      return EnvironmentEdgeManager.currentTime() - lastUpdate &lt; keepAliveTime;<a name="line.2013"></a>
+<span class="sourceLineNo">2014</span>    }<a name="line.2014"></a>
+<span class="sourceLineNo">2015</span>  }<a name="line.2015"></a>
+<span class="sourceLineNo">2016</span><a name="line.2016"></a>
+<span class="sourceLineNo">2017</span>  // ----------------------------------------------------------------------------<a name="line.2017"></a>
+<span class="sourceLineNo">2018</span>  // TODO-MAYBE: Should we provide a InlineChore to notify the store with the<a name="line.2018"></a>
+<span class="sourceLineNo">2019</span>  // full set of procedures pending and completed to write a compacted<a name="line.2019"></a>
+<span class="sourceLineNo">2020</span>  // version of the log (in case is a log)?<a name="line.2020"></a>
+<span class="sourceLineNo">2021</span>  // In theory no, procedures are have a short life, so at some point the store<a name="line.2021"></a>
+<span class="sourceLineNo">2022</span>  // will have the tracker saying everything is in the last log.<a name="line.2022"></a>
+<span class="sourceLineNo">2023</span>  // ----------------------------------------------------------------------------<a name="line.2023"></a>
 <span class="sourceLineNo">2024</span><a name="line.2024"></a>
-<span class="sourceLineNo">2025</span>    public static final String WORKER_STUCK_THRESHOLD_CONF_KEY =<a name="line.2025"></a>
-<span class="sourceLineNo">2026</span>        "hbase.procedure.worker.stuck.threshold.msec";<a name="line.2026"></a>
-<span class="sourceLineNo">2027</span>    private static final int DEFAULT_WORKER_STUCK_THRESHOLD = 10000; // 10sec<a name="line.2027"></a>
-<span class="sourceLineNo">2028</span><a name="line.2028"></a>
-<span class="sourceLineNo">2029</span>    public static final String WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY =<a name="line.2029"></a>
-<span class="sourceLineNo">2030</span>        "hbase.procedure.worker.add.stuck.percentage";<a name="line.2030"></a>
-<span class="sourceLineNo">2031</span>    private static final float DEFAULT_WORKER_ADD_STUCK_PERCENTAGE = 0.5f; // 50% stuck<a name="line.2031"></a>
-<span class="sourceLineNo">2032</span><a name="line.2032"></a>
-<span class="sourceLineNo">2033</span>    private float addWorkerStuckPercentage = DEFAULT_WORKER_ADD_STUCK_PERCENTAGE;<a name="line.2033"></a>
-<span class="sourceLineNo">2034</span>    private int timeoutInterval = DEFAULT_WORKER_MONITOR_INTERVAL;<a name="line.2034"></a>
-<span class="sourceLineNo">2035</span>    private int stuckThreshold = DEFAULT_WORKER_STUCK_THRESHOLD;<a name="line.2035"></a>
-<span class="sourceLineNo">2036</span><a name="line.2036"></a>
-<span class="sourceLineNo">2037</span>    public WorkerMonitor() {<a name="line.2037"></a>
-<span class="sourceLineNo">2038</span>      refreshConfig();<a name="line.2038"></a>
-<span class="sourceLineNo">2039</span>    }<a name="line.2039"></a>
-<span class="sourceLineNo">2040</span><a name="line.2040"></a>
-<span class="sourceLineNo">2041</span>    @Override<a name="line.2041"></a>
-<span class="sourceLineNo">2042</span>    public void run() {<a name="line.2042"></a>
-<span class="sourceLineNo">2043</span>      final int stuckCount = checkForStuckWorkers();<a name="line.2043"></a>
-<span class="sourceLineNo">2044</span>      checkThreadCount(stuckCount);<a name="line.2044"></a>
+<span class="sourceLineNo">2025</span>  private final class WorkerMonitor extends InlineChore {<a name="line.2025"></a>
+<span class="sourceLineNo">2026</span>    public static final String WORKER_MONITOR_INTERVAL_CONF_KEY =<a name="line.2026"></a>
+<span class="sourceLineNo">2027</span>        "hbase.procedure.worker.monitor.interval.msec";<a name="line.2027"></a>
+<span class="sourceLineNo">2028</span>    private static final int DEFAULT_WORKER_MONITOR_INTERVAL = 5000; // 5sec<a name="line.2028"></a>
+<span class="sourceLineNo">2029</span><a name="line.2029"></a>
+<span class="sourceLineNo">2030</span>    public static final String WORKER_STUCK_THRESHOLD_CONF_KEY =<a name="line.2030"></a>
+<span class="sourceLineNo">2031</span>        "hbase.procedure.worker.stuck.threshold.msec";<a name="line.2031"></a>
+<span class="sourceLineNo">2032</span>    private static final int DEFAULT_WORKER_STUCK_THRESHOLD = 10000; // 10sec<a name="line.2032"></a>
+<span class="sourceLineNo">2033</span><a name="line.2033"></a>
+<span class="sourceLineNo">2034</span>    public static final String WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY =<a name="line.2034"></a>
+<span class="sourceLineNo">2035</span>        "hbase.procedure.worker.add.stuck.percentage";<a name="line.2035"></a>
+<span class="sourceLineNo">2036</span>    private static final float DEFAULT_WORKER_ADD_STUCK_PERCENTAGE = 0.5f; // 50% stuck<a name="line.2036"></a>
+<span class="sourceLineNo">2037</span><a name="line.2037"></a>
+<span class="sourceLineNo">2038</span>    private float addWorkerStuckPercentage = DEFAULT_WORKER_ADD_STUCK_PERCENTAGE;<a name="line.2038"></a>
+<span class="sourceLineNo">2039</span>    private int timeoutInterval = DEFAULT_WORKER_MONITOR_INTERVAL;<a name="line.2039"></a>
+<span class="sourceLineNo">2040</span>    private int stuckThreshold = DEFAULT_WORKER_STUCK_THRESHOLD;<a name="line.2040"></a>
+<span class="sourceLineNo">2041</span><a name="line.2041"></a>
+<span class="sourceLineNo">2042</span>    public WorkerMonitor() {<a name="line.2042"></a>
+<span class="sourceLineNo">2043</span>      refreshConfig();<a name="line.2043"></a>
+<span class="sourceLineNo">2044</span>    }<a name="line.2044"></a>
 <span class="sourceLineNo">2045</span><a name="line.2045"></a>
-<span class="sourceLineNo">2046</span>      // refresh interval (poor man dynamic conf update)<a name="line.2046"></a>
-<span class="sourceLineNo">2047</span>      refreshConfig();<a name="line.2047"></a>
-<span class="sourceLineNo">2048</span>    }<a name="line.2048"></a>
-<span class="sourceLineNo">2049</span><a name="line.2049"></a>
-<span class="sourceLineNo">2050</span>    private int checkForStuckWorkers() {<a name="line.2050"></a>
-<span class="sourceLineNo">2051</span>      // check if any of the worker is stuck<a name="line.2051"></a>
-<span class="sourceLineNo">2052</span>      int stuckCount = 0;<a name="line.2052"></a>
-<span class="sourceLineNo">2053</span>      for (WorkerThread worker : workerThreads) {<a name="line.2053"></a>
-<span class="sourceLineNo">2054</span>        if (worker.getCurrentRunTime() &lt; stuckThreshold) {<a name="line.2054"></a>
-<span class="sourceLineNo">2055</span>          continue;<a name="line.2055"></a>
-<span class="sourceLineNo">2056</span>        }<a name="line.2056"></a>
-<span class="sourceLineNo">2057</span><a name="line.2057"></a>
-<span class="sourceLineNo">2058</span>        // WARN the worker is stuck<a name="line.2058"></a>
-<span class="sourceLineNo">2059</span>        stuckCount++;<a name="line.2059"></a>
-<span class="sourceLineNo">2060</span>        LOG.warn("Worker stuck {}, run time {}", worker,<a name="line.2060"></a>
-<span class="sourceLineNo">2061</span>          StringUtils.humanTimeDiff(worker.getCurrentRunTime()));<a name="line.2061"></a>
-<span class="sourceLineNo">2062</span>      }<a name="line.2062"></a>
-<span class="sourceLineNo">2063</span>      return stuckCount;<a name="line.2063"></a>
-<span class="sourceLineNo">2064</span>    }<a name="line.2064"></a>
-<span class="sourceLineNo">2065</span><a name="line.2065"></a>
-<span class="sourceLineNo">2066</span>    private void checkThreadCount(final int stuckCount) {<a name="line.2066"></a>
-<span class="sourceLineNo">2067</span>      // nothing to do if there are no runnable tasks<a name="line.2067"></a>
-<span class="sourceLineNo">2068</span>      if (stuckCount &lt; 1 || !scheduler.hasRunnables()) {<a name="line.2068"></a>
-<span class="sourceLineNo">2069</span>        return;<a name="line.2069"></a>
-<span class="sourceLineNo">2070</span>      }<a name="line.2070"></a>
-<span class="sourceLineNo">2071</span><a name="line.2071"></a>
-<span class="sourceLineNo">2072</span>      // add a new thread if the worker stuck percentage exceed the threshold limit<a name="line.2072"></a>
-<span class="sourceLineNo">2073</span>      // and every handler is active.<a name="line.2073"></a>
-<span class="sourceLineNo">2074</span>      final float stuckPerc = ((float) stuckCount) / workerThreads.size();<a name="line.2074"></a>
-<span class="sourceLineNo">2075</span>      // let's add new worker thread more aggressively, as they will timeout finally if there is no<a name="line.2075"></a>
-<span class="sourceLineNo">2076</span>      // work to do.<a name="line.2076"></a>
-<span class="sourceLineNo">2077</span>      if (stuckPerc &gt;= addWorkerStuckPercentage &amp;&amp; workerThreads.size() &lt; maxPoolSize) {<a name="line.2077"></a>
-<span class="sourceLineNo">2078</span>        final KeepAliveWorkerThread worker = new KeepAliveWorkerThread(threadGroup);<a name="line.2078"></a>
-<span class="sourceLineNo">2079</span>        workerThreads.add(worker);<a name="line.2079"></a>
-<span class="sourceLineNo">2080</span>        worker.start();<a name="line.2080"></a>
-<span class="sourceLineNo">2081</span>        LOG.debug("Added new worker thread {}", worker);<a name="line.2081"></a>
-<span class="sourceLineNo">2082</span>      }<a name="line.2082"></a>
-<span class="sourceLineNo">2083</span>    }<a name="line.2083"></a>
-<span class="sourceLineNo">2084</span><a name="line.2084"></a>
-<span class="sourceLineNo">2085</span>    private void refreshConfig() {<a name="line.2085"></a>
-<span class="sourceLineNo">2086</span>      addWorkerStuckPercentage = conf.getFloat(WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY,<a name="line.2086"></a>
-<span class="sourceLineNo">2087</span>          DEFAULT_WORKER_ADD_STUCK_PERCENTAGE);<a name="line.2087"></a>
-<span class="sourceLineNo">2088</span>      timeoutInterval = conf.getInt(WORKER_MONITOR_INTERVAL_CONF_KEY,<a name="line.2088"></a>
-<span class="sourceLineNo">2089</span>        DEFAULT_WORKER_MONITOR_INTERVAL);<a name="line.2089"></a>
-<span class="sourceLineNo">2090</span>      stuckThreshold = conf.getInt(WORKER_STUCK_THRESHOLD_CONF_KEY,<a name="line.2090"></a>
-<span class="sourceLineNo">2091</span>        DEFAULT_WORKER_STUCK_THRESHOLD);<a name="line.2091"></a>
-<span class="sourceLineNo">2092</span>    }<a name="line.2092"></a>
-<span class="sourceLineNo">2093</span><a name="line.2093"></a>
-<span class="sourceLineNo">2094</span>    @Override<a name="line.2094"></a>
-<span class="sourceLineNo">2095</span>    public int getTimeoutInterval() {<a name="line.2095"></a>
-<span class="sourceLineNo">2096</span>      return timeoutInterval;<a name="line.2096"></a>
+<span class="sourceLineNo">2046</span>    @Override<a name="line.2046"></a>
+<span class="sourceLineNo">2047</span>    public void run() {<a name="line.2047"></a>
+<span class="sourceLineNo">2048</span>      final int stuckCount = checkForStuckWorkers();<a name="line.2048"></a>
+<span class="sourceLineNo">2049</span>      checkThreadCount(stuckCount);<a name="line.2049"></a>
+<span class="sourceLineNo">2050</span><a name="line.2050"></a>
+<span class="sourceLineNo">2051</span>      // refresh interval (poor man dynamic conf update)<a name="line.2051"></a>
+<span class="sourceLineNo">2052</span>      refreshConfig();<a name="line.2052"></a>
+<span class="sourceLineNo">2053</span>    }<a name="line.2053"></a>
+<span class="sourceLineNo">2054</span><a name="line.2054"></a>
+<span class="sourceLineNo">2055</span>    private int checkForStuckWorkers() {<a name="line.2055"></a>
+<span class="sourceLineNo">2056</span>      // check if any of the worker is stuck<a name="line.2056"></a>
+<span class="sourceLineNo">2057</span>      int stuckCount = 0;<a name="line.2057"></a>
+<span class="sourceLineNo">2058</span>      for (WorkerThread worker : workerThreads) {<a name="line.2058"></a>
+<span class="sourceLineNo">2059</span>        if (worker.getCurrentRunTime() &lt; stuckThreshold) {<a name="line.2059"></a>
+<span class="sourceLineNo">2060</span>          continue;<a name="line.2060"></a>
+<span class="sourceLineNo">2061</span>        }<a name="line.2061"></a>
+<span class="sourceLineNo">2062</span><a name="line.2062"></a>
+<span class="sourceLineNo">2063</span>        // WARN the worker is stuck<a name="line.2063"></a>
+<span class="sourceLineNo">2064</span>        stuckCount++;<a name="line.2064"></a>
+<span class="sourceLineNo">2065</span>        LOG.warn("Worker stuck {}, run time {}", worker,<a name="line.2065"></a>
+<span class="sourceLineNo">2066</span>          StringUtils.humanTimeDiff(worker.getCurrentRunTime()));<a name="line.2066"></a>
+<span class="sourceLineNo">2067</span>      }<a name="line.2067"></a>
+<span class="sourceLineNo">2068</span>      return stuckCount;<a name="line.2068"></a>
+<span class="sourceLineNo">2069</span>    }<a name="line.2069"></a>
+<span class="sourceLineNo">2070</span><a name="line.2070"></a>
+<span class="sourceLineNo">2071</span>    private void checkThreadCount(final int stuckCount) {<a name="line.2071"></a>
+<span class="sourceLineNo">2072</span>      // nothing to do if there are no runnable tasks<a name="line.2072"></a>
+<span class="sourceLineNo">2073</span>      if (stuckCount &lt; 1 || !scheduler.hasRunnables()) {<a name="line.2073"></a>
+<span class="sourceLineNo">2074</span>        return;<a name="line.2074"></a>
+<span class="sourceLineNo">2075</span>      }<a name="line.2075"></a>
+<span class="sourceLineNo">2076</span><a name="line.2076"></a>
+<span class="sourceLineNo">2077</span>      // add a new thread if the worker stuck percentage exceed the threshold limit<a name="line.2077"></a>
+<span class="sourceLineNo">2078</span>      // and every handler is active.<a name="line.2078"></a>
+<span class="sourceLineNo">2079</span>      final float stuckPerc = ((float) stuckCount) / workerThreads.size();<a name="line.2079"></a>
+<span class="sourceLineNo">2080</span>      // let's add new worker thread more aggressively, as they will timeout finally if there is no<a name="line.2080"></a>
+<span class="sourceLineNo">2081</span>      // work to do.<a name="line.2081"></a>
+<span class="sourceLineNo">2082</span>      if (stuckPerc &gt;= addWorkerStuckPercentage &amp;&amp; workerThreads.size() &lt; maxPoolSize) {<a name="line.2082"></a>
+<span class="sourceLineNo">2083</span>        final KeepAliveWorkerThread worker = new KeepAliveWorkerThread(threadGroup);<a name="line.2083"></a>
+<span class="sourceLineNo">2084</span>        workerThreads.add(worker);<a name="line.2084"></a>
+<span class="sourceLineNo">2085</span>        worker.start();<a name="line.2085"></a>
+<span class="sourceLineNo">2086</span>        LOG.debug("Added new worker thread {}", worker);<a name="line.2086"></a>
+<span class="sourceLineNo">2087</span>      }<a name="line.2087"></a>
+<span class="sourceLineNo">2088</span>    }<a name="line.2088"></a>
+<span class="sourceLineNo">2089</span><a name="line.2089"></a>
+<span class="sourceLineNo">2090</span>    private void refreshConfig() {<a name="line.2090"></a>
+<span class="sourceLineNo">2091</span>      addWorkerStuckPercentage = conf.getFloat(WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY,<a name="line.2091"></a>
+<span class="sourceLineNo">2092</span>          DEFAULT_WORKER_ADD_STUCK_PERCENTAGE);<a name="line.2092"></a>
+<span class="sourceLineNo">2093</span>      timeoutInterval = conf.getInt(WORKER_MONITOR_INTERVAL_CONF_KEY,<a name="line.2093"></a>
+<span class="sourceLineNo">2094</span>        DEFAULT_WORKER_MONITOR_INTERVAL);<a name="line.2094"></a>
+<span class="sourceLineNo">2095</span>      stuckThreshold = conf.getInt(WORKER_STUCK_THRESHOLD_CONF_KEY,<a name="line.2095"></a>
+<span class="sourceLineNo">2096</span>        DEFAULT_WORKER_STUCK_THRESHOLD);<a name="line.2096"></a>
 <span class="sourceLineNo">2097</span>    }<a name="line.2097"></a>
-<span class="sourceLineNo">2098</span>  }<a name="line.2098"></a>
-<span class="sourceLineNo">2099</span>}<a name="line.2099"></a>
+<span class="sourceLineNo">2098</span><a name="line.2098"></a>
+<span class="sourceLineNo">2099</span>    @Override<a name="line.2099"></a>
+<span class="sourceLineNo">2100</span>    public int getTimeoutInterval() {<a name="line.2100"></a>
+<span class="sourceLineNo">2101</span>      return timeoutInterval;<a name="line.2101"></a>
+<span class="sourceLineNo">2102</span>    }<a name="line.2102"></a>
+<span class="sourceLineNo">2103</span>  }<a name="line.2103"></a>
+<span class="sourceLineNo">2104</span>}<a name="line.2104"></a>