You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2017/05/24 07:20:49 UTC
[26/27] hbase git commit: HBASE-14614 Procedure v2 - Core Assignment
Manager (Matteo Bertozzi) Move to a new AssignmentManager,
one that describes Assignment using a State Machine built on top of
ProcedureV2 facility.
http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
index 1bb6118..d111683 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
@@ -32,6 +32,8 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.DelayQueue;
@@ -113,9 +115,11 @@ public class ProcedureExecutor<TEnvironment> {
* Internal cleaner that removes the completed procedure results after a TTL.
* NOTE: This is a special case handled in timeoutLoop().
*
- * Since the client code looks more or less like:
+ * <p>Since the client code looks more or less like:
+ * <pre>
* procId = master.doOperation()
* while (master.getProcResult(procId) == ProcInProgress);
+ * </pre>
* The master should not throw away the proc result as soon as the procedure is done
* but should wait a result request from the client (see executor.removeResult(procId))
* The client will call something like master.isProcDone() or master.getProcResult()
@@ -480,10 +484,10 @@ public class ProcedureExecutor<TEnvironment> {
// We have numThreads executor + one timer thread used for timing out
// procedures and triggering periodic procedures.
this.corePoolSize = numThreads;
- LOG.info("Starting executor worker threads=" + corePoolSize);
+ LOG.info("Starting ProcedureExecutor Worker threads (ProcExecWrkr)=" + corePoolSize);
// Create the Thread Group for the executors
- threadGroup = new ThreadGroup("ProcedureExecutor");
+ threadGroup = new ThreadGroup("ProcExecThrdGrp");
// Create the timeout executor
timeoutExecutor = new TimeoutExecutorThread(threadGroup);
@@ -1077,13 +1081,16 @@ public class ProcedureExecutor<TEnvironment> {
final Long rootProcId = getRootProcedureId(proc);
if (rootProcId == null) {
// The 'proc' was ready to run but the root procedure was rolledback
+ LOG.warn("Rollback because parent is done/rolledback proc=" + proc);
executeRollback(proc);
return;
}
final RootProcedureState procStack = rollbackStack.get(rootProcId);
- if (procStack == null) return;
-
+ if (procStack == null) {
+ LOG.warn("RootProcedureState is null for " + proc.getProcId());
+ return;
+ }
do {
// Try to acquire the execution
if (!procStack.acquire(proc)) {
@@ -1097,6 +1104,7 @@ public class ProcedureExecutor<TEnvironment> {
scheduler.yield(proc);
break;
case LOCK_EVENT_WAIT:
+ LOG.info("DEBUG LOCK_EVENT_WAIT rollback..." + proc);
procStack.unsetRollback();
break;
default:
@@ -1114,6 +1122,7 @@ public class ProcedureExecutor<TEnvironment> {
scheduler.yield(proc);
break;
case LOCK_EVENT_WAIT:
+ LOG.info("DEBUG LOCK_EVENT_WAIT can't rollback child running?..." + proc);
break;
default:
throw new UnsupportedOperationException();
@@ -1125,16 +1134,21 @@ public class ProcedureExecutor<TEnvironment> {
// Execute the procedure
assert proc.getState() == ProcedureState.RUNNABLE : proc;
- switch (acquireLock(proc)) {
+ // Note that lock is NOT about concurrency but rather about ensuring
+ // ownership of a procedure of an entity such as a region or table.
+ LockState lockState = acquireLock(proc);
+ switch (lockState) {
case LOCK_ACQUIRED:
execProcedure(procStack, proc);
releaseLock(proc, false);
break;
case LOCK_YIELD_WAIT:
+ LOG.info(lockState + " " + proc);
scheduler.yield(proc);
break;
case LOCK_EVENT_WAIT:
- // someone will wake us up when the lock is available
+ // Someone will wake us up when the lock is available
+ LOG.debug(lockState + " " + proc);
break;
default:
throw new UnsupportedOperationException();
@@ -1150,10 +1164,7 @@ public class ProcedureExecutor<TEnvironment> {
if (proc.isSuccess()) {
// update metrics on finishing the procedure
proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), true);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Finished " + proc + " in " + StringUtils.humanTimeDiff(proc.elapsedTime()));
- }
+ LOG.info("Finish " + proc + " in " + StringUtils.humanTimeDiff(proc.elapsedTime()));
// Finalize the procedure state
if (proc.getProcId() == rootProcId) {
procedureFinished(proc);
@@ -1178,7 +1189,7 @@ public class ProcedureExecutor<TEnvironment> {
private void releaseLock(final Procedure proc, final boolean force) {
final TEnvironment env = getEnvironment();
- // for how the framework works, we know that we will always have the lock
+ // For how the framework works, we know that we will always have the lock
// when we call releaseLock(), so we can avoid calling proc.hasLock()
if (force || !proc.holdLock(env)) {
proc.doReleaseLock(env);
@@ -1193,6 +1204,8 @@ public class ProcedureExecutor<TEnvironment> {
private LockState executeRollback(final long rootProcId, final RootProcedureState procStack) {
final Procedure rootProc = procedures.get(rootProcId);
RemoteProcedureException exception = rootProc.getException();
+ // TODO: This needs doc. The root proc doesn't have an exception. Maybe we are
+ // rolling back because the subprocedure does. Clarify.
if (exception == null) {
exception = procStack.getException();
rootProc.setFailure(exception);
@@ -1269,7 +1282,7 @@ public class ProcedureExecutor<TEnvironment> {
return LockState.LOCK_YIELD_WAIT;
} catch (Throwable e) {
// Catch NullPointerExceptions or similar errors...
- LOG.fatal("CODE-BUG: Uncatched runtime exception for procedure: " + proc, e);
+ LOG.fatal("CODE-BUG: Uncaught runtime exception fo " + proc, e);
}
// allows to kill the executor before something is stored to the wal.
@@ -1305,29 +1318,55 @@ public class ProcedureExecutor<TEnvironment> {
}
/**
- * Executes the specified procedure
- * - calls the doExecute() of the procedure
- * - if the procedure execution didn't fail (e.g. invalid user input)
- * - ...and returned subprocedures
- * - the subprocedures are initialized.
- * - the subprocedures are added to the store
- * - the subprocedures are added to the runnable queue
- * - the procedure is now in a WAITING state, waiting for the subprocedures to complete
- * - ...if there are no subprocedure
- * - the procedure completed successfully
- * - if there is a parent (WAITING)
- * - the parent state will be set to RUNNABLE
- * - in case of failure
- * - the store is updated with the new state
- * - the executor (caller of this method) will start the rollback of the procedure
+ * Executes <code>procedure</code>
+ * <ul>
+ * <li>Calls the doExecute() of the procedure
+ * <li>If the procedure execution didn't fail (i.e. valid user input)
+ * <ul>
+ * <li>...and returned subprocedures
+ * <ul><li>The subprocedures are initialized.
+ * <li>The subprocedures are added to the store
+ * <li>The subprocedures are added to the runnable queue
+ * <li>The procedure is now in a WAITING state, waiting for the subprocedures to complete
+ * </ul>
+ * </li>
+ * <li>...if there are no subprocedure
+ * <ul><li>the procedure completed successfully
+ * <li>if there is a parent (WAITING)
+ * <li>the parent state will be set to RUNNABLE
+ * </ul>
+ * </li>
+ * </ul>
+ * </li>
+ * <li>In case of failure
+ * <ul>
+ * <li>The store is updated with the new state</li>
+ * <li>The executor (caller of this method) will start the rollback of the procedure</li>
+ * </ul>
+ * </li>
+ * </ul>
*/
- private void execProcedure(final RootProcedureState procStack, final Procedure procedure) {
+ private void execProcedure(final RootProcedureState procStack,
+ final Procedure<TEnvironment> procedure) {
Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE);
- // Execute the procedure
+ // Procedures can suspend themselves. They skip out by throwing a ProcedureSuspendedException.
+ // The exception is caught below and then we hurry to the exit without disturbing state. The
+ // idea is that the processing of this procedure will be unsuspended later by an external event
+ // such the report of a region open. TODO: Currently, its possible for two worker threads
+ // to be working on the same procedure concurrently (locking in procedures is NOT about
+ // concurrency but about tying an entity to a procedure; i.e. a region to a particular
+ // procedure instance). This can make for issues if both threads are changing state.
+ // See env.getProcedureScheduler().wakeEvent(regionNode.getProcedureEvent());
+ // in RegionTransitionProcedure#reportTransition for example of Procedure putting
+ // itself back on the scheduler making it possible for two threads running against
+ // the one Procedure. Might be ok if they are both doing different, idempotent sections.
boolean suspended = false;
+
+ // Whether to 're-' -execute; run through the loop again.
boolean reExecute = false;
- Procedure[] subprocs = null;
+
+ Procedure<TEnvironment>[] subprocs = null;
do {
reExecute = false;
try {
@@ -1336,14 +1375,20 @@ public class ProcedureExecutor<TEnvironment> {
subprocs = null;
}
} catch (ProcedureSuspendedException e) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Suspend " + procedure);
+ }
suspended = true;
} catch (ProcedureYieldException e) {
if (LOG.isTraceEnabled()) {
- LOG.trace("Yield " + procedure + ": " + e.getMessage());
+ LOG.trace("Yield " + procedure + ": " + e.getMessage(), e);
}
scheduler.yield(procedure);
return;
} catch (InterruptedException e) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Yield interrupt " + procedure + ": " + e.getMessage(), e);
+ }
handleInterruptedException(procedure, e);
scheduler.yield(procedure);
return;
@@ -1357,14 +1402,26 @@ public class ProcedureExecutor<TEnvironment> {
if (!procedure.isFailed()) {
if (subprocs != null) {
if (subprocs.length == 1 && subprocs[0] == procedure) {
- // quick-shortcut for a state machine like procedure
+ // Procedure returned itself. Quick-shortcut for a state machine-like procedure;
+ // i.e. we go around this loop again rather than go back out on the scheduler queue.
subprocs = null;
reExecute = true;
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Short-circuit to next step on pid=" + procedure.getProcId());
+ }
} else {
- // yield the current procedure, and make the subprocedure runnable
+ // Yield the current procedure, and make the subprocedure runnable
+ // subprocs may come back 'null'.
subprocs = initializeChildren(procStack, procedure, subprocs);
+ LOG.info("Initialized subprocedures=" +
+ (subprocs == null? null:
+ Stream.of(subprocs).map(e -> "{" + e.toString() + "}").
+ collect(Collectors.toList()).toString()));
}
} else if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Added to timeoutExecutor " + procedure);
+ }
timeoutExecutor.add(procedure);
} else if (!suspended) {
// No subtask, so we are done
@@ -1388,12 +1445,13 @@ public class ProcedureExecutor<TEnvironment> {
// executor thread to stop. The statement following the method call below seems to check if
// store is not running, to prevent scheduling children procedures, re-execution or yield
// of this procedure. This may need more scrutiny and subsequent cleanup in future
- // Commit the transaction
+ //
+ // Commit the transaction even if a suspend (state may have changed). Note this append
+ // can take a bunch of time to complete.
updateStoreOnExec(procStack, procedure, subprocs);
// if the store is not running we are aborting
if (!store.isRunning()) return;
-
// if the procedure is kind enough to pass the slot to someone else, yield
if (procedure.isRunnable() && !suspended &&
procedure.isYieldAfterExecutionStep(getEnvironment())) {
@@ -1403,14 +1461,14 @@ public class ProcedureExecutor<TEnvironment> {
assert (reExecute && subprocs == null) || !reExecute;
} while (reExecute);
-
// Submit the new subprocedures
if (subprocs != null && !procedure.isFailed()) {
submitChildrenProcedures(subprocs);
}
- // if the procedure is complete and has a parent, count down the children latch
- if (procedure.isFinished() && procedure.hasParent()) {
+ // if the procedure is complete and has a parent, count down the children latch.
+ // If 'suspended', do nothing to change state -- let other threads handle unsuspend event.
+ if (!suspended && procedure.isFinished() && procedure.hasParent()) {
countDownChildren(procStack, procedure);
}
}
@@ -1469,18 +1527,13 @@ public class ProcedureExecutor<TEnvironment> {
}
// If this procedure is the last child awake the parent procedure
- final boolean traceEnabled = LOG.isTraceEnabled();
- if (traceEnabled) {
- LOG.trace(parent + " child is done: " + procedure);
- }
-
- if (parent.childrenCountDown() && parent.getState() == ProcedureState.WAITING) {
- parent.setState(ProcedureState.RUNNABLE);
+ LOG.info("Finish suprocedure " + procedure);
+ if (parent.tryRunnable()) {
+ // If we succeeded in making the parent runnable -- i.e. all of its
+ // children have completed, move parent to front of the queue.
store.update(parent);
scheduler.addFront(parent);
- if (traceEnabled) {
- LOG.trace(parent + " all the children finished their work, resume.");
- }
+ LOG.info("Finished subprocedure(s) of " + parent + "; resume parent processing.");
return;
}
}
@@ -1569,9 +1622,10 @@ public class ProcedureExecutor<TEnvironment> {
// ==========================================================================
private final class WorkerThread extends StoppableThread {
private final AtomicLong executionStartTime = new AtomicLong(Long.MAX_VALUE);
+ private Procedure activeProcedure;
public WorkerThread(final ThreadGroup group) {
- super(group, "ProcExecWorker-" + workerId.incrementAndGet());
+ super(group, "ProcExecWrkr-" + workerId.incrementAndGet());
}
@Override
@@ -1581,29 +1635,49 @@ public class ProcedureExecutor<TEnvironment> {
@Override
public void run() {
- final boolean traceEnabled = LOG.isTraceEnabled();
long lastUpdate = EnvironmentEdgeManager.currentTime();
- while (isRunning() && keepAlive(lastUpdate)) {
- final Procedure procedure = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS);
- if (procedure == null) continue;
-
- store.setRunningProcedureCount(activeExecutorCount.incrementAndGet());
- executionStartTime.set(EnvironmentEdgeManager.currentTime());
- try {
- if (traceEnabled) {
- LOG.trace("Trying to start the execution of " + procedure);
+ try {
+ while (isRunning() && keepAlive(lastUpdate)) {
+ this.activeProcedure = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS);
+ if (this.activeProcedure == null) continue;
+ int activeCount = activeExecutorCount.incrementAndGet();
+ int runningCount = store.setRunningProcedureCount(activeCount);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Execute pid=" + this.activeProcedure.getProcId() +
+ " runningCount=" + runningCount + ", activeCount=" + activeCount);
+ }
+ executionStartTime.set(EnvironmentEdgeManager.currentTime());
+ try {
+ executeProcedure(this.activeProcedure);
+ } catch (AssertionError e) {
+ LOG.info("ASSERT pid=" + this.activeProcedure.getProcId(), e);
+ throw e;
+ } finally {
+ activeCount = activeExecutorCount.decrementAndGet();
+ runningCount = store.setRunningProcedureCount(activeCount);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Halt pid=" + this.activeProcedure.getProcId() +
+ " runningCount=" + runningCount + ", activeCount=" + activeCount);
+ }
+ this.activeProcedure = null;
+ lastUpdate = EnvironmentEdgeManager.currentTime();
+ executionStartTime.set(Long.MAX_VALUE);
}
- executeProcedure(procedure);
- } finally {
- store.setRunningProcedureCount(activeExecutorCount.decrementAndGet());
- lastUpdate = EnvironmentEdgeManager.currentTime();
- executionStartTime.set(Long.MAX_VALUE);
}
+ } catch (Throwable t) {
+ LOG.warn("Worker terminating UNNATURALLY " + this.activeProcedure, t);
+ } finally {
+ LOG.debug("Worker terminated.");
}
- LOG.debug("Worker thread terminated " + this);
workerThreads.remove(this);
}
+ @Override
+ public String toString() {
+ Procedure<?> p = this.activeProcedure;
+ return getName() + "(pid=" + (p == null? Procedure.NO_PROC_ID: p.getProcId() + ")");
+ }
+
/**
* @return the time since the current procedure is running
*/
@@ -1617,14 +1691,15 @@ public class ProcedureExecutor<TEnvironment> {
}
}
- // ==========================================================================
- // Timeout Thread
- // ==========================================================================
+ /**
+ * Runs task on a period such as check for stuck workers.
+ * @see InlineChore
+ */
private final class TimeoutExecutorThread extends StoppableThread {
private final DelayQueue<DelayedWithTimeout> queue = new DelayQueue<>();
public TimeoutExecutorThread(final ThreadGroup group) {
- super(group, "ProcedureTimeoutExecutor");
+ super(group, "ProcExecTimeout");
}
@Override
@@ -1634,7 +1709,7 @@ public class ProcedureExecutor<TEnvironment> {
@Override
public void run() {
- final boolean isTraceEnabled = LOG.isTraceEnabled();
+ final boolean traceEnabled = LOG.isTraceEnabled();
while (isRunning()) {
final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue);
if (task == null || task == DelayedUtil.DELAYED_POISON) {
@@ -1643,8 +1718,8 @@ public class ProcedureExecutor<TEnvironment> {
continue;
}
- if (isTraceEnabled) {
- LOG.trace("Trying to start the execution of " + task);
+ if (traceEnabled) {
+ LOG.trace("Executing " + task);
}
// execute the task
@@ -1665,6 +1740,8 @@ public class ProcedureExecutor<TEnvironment> {
public void add(final Procedure procedure) {
assert procedure.getState() == ProcedureState.WAITING_TIMEOUT;
+ LOG.info("ADDED " + procedure + "; timeout=" + procedure.getTimeout() +
+ ", timestamp=" + procedure.getTimeoutTimestamp());
queue.add(new DelayedProcedure(procedure));
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureInMemoryChore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureInMemoryChore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureInMemoryChore.java
index bdced10..b148dae 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureInMemoryChore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureInMemoryChore.java
@@ -26,7 +26,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
/**
* Special procedure used as a chore.
- * instead of bringing the Chore class in (dependencies reason),
+ * Instead of bringing the Chore class in (dependencies reason),
* we reuse the executor timeout thread for this special case.
*
* The assumption is that procedure is used as hook to dispatch other procedures
@@ -43,7 +43,7 @@ public abstract class ProcedureInMemoryChore<TEnvironment> extends Procedure<TEn
protected abstract void periodicExecute(final TEnvironment env);
@Override
- protected Procedure[] execute(final TEnvironment env) {
+ protected Procedure<TEnvironment>[] execute(final TEnvironment env) {
throw new UnsupportedOperationException();
}
@@ -66,4 +66,4 @@ public abstract class ProcedureInMemoryChore<TEnvironment> extends Procedure<TEn
public void deserializeStateData(final InputStream stream) {
throw new UnsupportedOperationException();
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java
index b5295e7..a2ae514 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java
@@ -93,7 +93,7 @@ public interface ProcedureScheduler {
/**
* Mark the event as not ready.
- * procedures calling waitEvent() will be suspended.
+ * Procedures calling waitEvent() will be suspended.
* @param event the event to mark as suspended/not ready
*/
void suspendEvent(ProcedureEvent event);
@@ -125,6 +125,7 @@ public interface ProcedureScheduler {
* List lock queues.
* @return the locks
*/
+ // TODO: This seems to be the wrong place to hang this method.
List<LockInfo> listLocks();
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java
new file mode 100644
index 0000000..8d5ff3c
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java
@@ -0,0 +1,375 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2;
+
+import java.io.IOException;
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.procedure2.util.DelayedUtil;
+import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedContainerWithTimestamp;
+import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedWithTimeout;
+import org.apache.hadoop.hbase.procedure2.util.StringUtils;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Threads;
+
+import com.google.common.collect.ArrayListMultimap;
+
+/**
+ * A procedure dispatcher that aggregates and sends after elapsed time or after we hit
+ * count threshold. Creates its own threadpool to run RPCs with timeout.
+ * <ul>
+ * <li>Each server queue has a dispatch buffer</li>
+ * <li>Once the dispatch buffer reaches a threshold-size/time we send<li>
+ * </ul>
+ * <p>Call {@link #start()} and then {@link #submitTask(Callable)}. When done,
+ * call {@link #stop()}.
+ */
+@InterfaceAudience.Private
+public abstract class RemoteProcedureDispatcher<TEnv, TRemote extends Comparable<TRemote>> {
+ private static final Log LOG = LogFactory.getLog(RemoteProcedureDispatcher.class);
+
+ public static final String THREAD_POOL_SIZE_CONF_KEY =
+ "hbase.procedure.remote.dispatcher.threadpool.size";
+ private static final int DEFAULT_THREAD_POOL_SIZE = 128;
+
+ public static final String DISPATCH_DELAY_CONF_KEY =
+ "hbase.procedure.remote.dispatcher.delay.msec";
+ private static final int DEFAULT_DISPATCH_DELAY = 150;
+
+ public static final String DISPATCH_MAX_QUEUE_SIZE_CONF_KEY =
+ "hbase.procedure.remote.dispatcher.max.queue.size";
+ private static final int DEFAULT_MAX_QUEUE_SIZE = 32;
+
+ private final AtomicBoolean running = new AtomicBoolean(false);
+ private final ConcurrentHashMap<TRemote, BufferNode> nodeMap =
+ new ConcurrentHashMap<TRemote, BufferNode>();
+
+ private final int operationDelay;
+ private final int queueMaxSize;
+ private final int corePoolSize;
+
+ private TimeoutExecutorThread timeoutExecutor;
+ private ThreadPoolExecutor threadPool;
+
+ protected RemoteProcedureDispatcher(Configuration conf) {
+ this.corePoolSize = conf.getInt(THREAD_POOL_SIZE_CONF_KEY, DEFAULT_THREAD_POOL_SIZE);
+ this.operationDelay = conf.getInt(DISPATCH_DELAY_CONF_KEY, DEFAULT_DISPATCH_DELAY);
+ this.queueMaxSize = conf.getInt(DISPATCH_MAX_QUEUE_SIZE_CONF_KEY, DEFAULT_MAX_QUEUE_SIZE);
+ }
+
+ public boolean start() {
+ if (running.getAndSet(true)) {
+ LOG.warn("Already running");
+ return false;
+ }
+
+ LOG.info("Starting procedure remote dispatcher; threads=" + this.corePoolSize +
+ ", queueMaxSize=" + this.queueMaxSize + ", operationDelay=" + this.operationDelay);
+
+ // Create the timeout executor
+ timeoutExecutor = new TimeoutExecutorThread();
+ timeoutExecutor.start();
+
+ // Create the thread pool that will execute RPCs
+ threadPool = Threads.getBoundedCachedThreadPool(corePoolSize, 60L, TimeUnit.SECONDS,
+ Threads.newDaemonThreadFactory(this.getClass().getSimpleName(),
+ getUncaughtExceptionHandler()));
+ return true;
+ }
+
+ public boolean stop() {
+ if (!running.getAndSet(false)) {
+ return false;
+ }
+
+ LOG.info("Stopping procedure remote dispatcher");
+
+ // send stop signals
+ timeoutExecutor.sendStopSignal();
+ threadPool.shutdownNow();
+ return true;
+ }
+
+ public void join() {
+ assert !running.get() : "expected not running";
+
+ // wait the timeout executor
+ timeoutExecutor.awaitTermination();
+ timeoutExecutor = null;
+
+ // wait for the thread pool to terminate
+ threadPool.shutdownNow();
+ try {
+ while (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
+ LOG.warn("Waiting for thread-pool to terminate");
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while waiting for thread-pool termination", e);
+ }
+ }
+
+ protected UncaughtExceptionHandler getUncaughtExceptionHandler() {
+ return new UncaughtExceptionHandler() {
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ LOG.warn("Failed to execute remote procedures " + t.getName(), e);
+ }
+ };
+ }
+
+ // ============================================================================================
+ // Node Helpers
+ // ============================================================================================
+ /**
+ * Add a node that will be able to execute remote procedures
+ * @param key the node identifier
+ */
+ public void addNode(final TRemote key) {
+ assert key != null: "Tried to add a node with a null key";
+ final BufferNode newNode = new BufferNode(key);
+ nodeMap.putIfAbsent(key, newNode);
+ }
+
+ /**
+ * Add a remote rpc. Be sure to check result for successful add.
+ * @param key the node identifier
+ * @return True if we successfully added the operation.
+ */
+ public boolean addOperationToNode(final TRemote key, RemoteProcedure rp) {
+ assert key != null : "found null key for node";
+ BufferNode node = nodeMap.get(key);
+ if (node == null) {
+ return false;
+ }
+ node.add(rp);
+ // Check our node still in the map; could have been removed by #removeNode.
+ return nodeMap.contains(node);
+ }
+
+ /**
+ * Remove a remote node
+ * @param key the node identifier
+ */
+ public boolean removeNode(final TRemote key) {
+ final BufferNode node = nodeMap.remove(key);
+ if (node == null) return false;
+ node.abortOperationsInQueue();
+ return true;
+ }
+
+ // ============================================================================================
+ // Task Helpers
+ // ============================================================================================
+ protected Future<Void> submitTask(Callable<Void> task) {
+ return threadPool.submit(task);
+ }
+
+ protected Future<Void> submitTask(Callable<Void> task, long delay, TimeUnit unit) {
+ final FutureTask<Void> futureTask = new FutureTask(task);
+ timeoutExecutor.add(new DelayedTask(futureTask, delay, unit));
+ return futureTask;
+ }
+
+ protected abstract void remoteDispatch(TRemote key, Set<RemoteProcedure> operations);
+ protected abstract void abortPendingOperations(TRemote key, Set<RemoteProcedure> operations);
+
+ /**
+ * Data structure with reference to remote operation.
+ */
+ public static abstract class RemoteOperation {
+ private final RemoteProcedure remoteProcedure;
+
+ protected RemoteOperation(final RemoteProcedure remoteProcedure) {
+ this.remoteProcedure = remoteProcedure;
+ }
+
+ public RemoteProcedure getRemoteProcedure() {
+ return remoteProcedure;
+ }
+ }
+
+ /**
+ * Remote procedure reference.
+ * @param <TEnv>
+ * @param <TRemote>
+ */
+ public interface RemoteProcedure<TEnv, TRemote> {
+ RemoteOperation remoteCallBuild(TEnv env, TRemote remote);
+ void remoteCallCompleted(TEnv env, TRemote remote, RemoteOperation response);
+ void remoteCallFailed(TEnv env, TRemote remote, IOException exception);
+ }
+
+ /**
+ * Account of what procedures are running on remote node.
+ * @param <TEnv>
+ * @param <TRemote>
+ */
+ public interface RemoteNode<TEnv, TRemote> {
+ TRemote getKey();
+ void add(RemoteProcedure<TEnv, TRemote> operation);
+ void dispatch();
+ }
+
+ protected ArrayListMultimap<Class<?>, RemoteOperation> buildAndGroupRequestByType(final TEnv env,
+ final TRemote remote, final Set<RemoteProcedure> operations) {
+ final ArrayListMultimap<Class<?>, RemoteOperation> requestByType = ArrayListMultimap.create();
+ for (RemoteProcedure proc: operations) {
+ RemoteOperation operation = proc.remoteCallBuild(env, remote);
+ requestByType.put(operation.getClass(), operation);
+ }
+ return requestByType;
+ }
+
+ protected <T extends RemoteOperation> List<T> fetchType(
+ final ArrayListMultimap<Class<?>, RemoteOperation> requestByType, final Class<T> type) {
+ return (List<T>)requestByType.removeAll(type);
+ }
+
+ // ============================================================================================
+ // Timeout Helpers
+ // ============================================================================================
+ private final class TimeoutExecutorThread extends Thread {
+ private final DelayQueue<DelayedWithTimeout> queue = new DelayQueue<DelayedWithTimeout>();
+
+ public TimeoutExecutorThread() {
+ super("ProcedureDispatcherTimeoutThread");
+ }
+
+ @Override
+ public void run() {
+ while (running.get()) {
+ final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue);
+ if (task == null || task == DelayedUtil.DELAYED_POISON) {
+ // the executor may be shutting down, and the task is just the shutdown request
+ continue;
+ }
+ if (task instanceof DelayedTask) {
+ threadPool.execute(((DelayedTask)task).getObject());
+ } else {
+ ((BufferNode)task).dispatch();
+ }
+ }
+ }
+
+ public void add(final DelayedWithTimeout delayed) {
+ queue.add(delayed);
+ }
+
+ public void remove(final DelayedWithTimeout delayed) {
+ queue.remove(delayed);
+ }
+
+ public void sendStopSignal() {
+ queue.add(DelayedUtil.DELAYED_POISON);
+ }
+
+ public void awaitTermination() {
+ try {
+ final long startTime = EnvironmentEdgeManager.currentTime();
+ for (int i = 0; isAlive(); ++i) {
+ sendStopSignal();
+ join(250);
+ if (i > 0 && (i % 8) == 0) {
+ LOG.warn("Waiting termination of thread " + getName() + ", " +
+ StringUtils.humanTimeDiff(EnvironmentEdgeManager.currentTime() - startTime));
+ }
+ }
+ } catch (InterruptedException e) {
+ LOG.warn(getName() + " join wait got interrupted", e);
+ }
+ }
+ }
+
+ // ============================================================================================
+ // Internals Helpers
+ // ============================================================================================
+
+ /**
+ * Node that contains a set of RemoteProcedures
+ */
+ protected final class BufferNode extends DelayedContainerWithTimestamp<TRemote>
+ implements RemoteNode<TEnv, TRemote> {
+ private Set<RemoteProcedure> operations;
+
+ protected BufferNode(final TRemote key) {
+ super(key, 0);
+ }
+
+ public TRemote getKey() {
+ return getObject();
+ }
+
+ public synchronized void add(final RemoteProcedure operation) {
+ if (this.operations == null) {
+ this.operations = new HashSet<>();
+ setTimeout(EnvironmentEdgeManager.currentTime() + operationDelay);
+ timeoutExecutor.add(this);
+ }
+ this.operations.add(operation);
+ if (this.operations.size() > queueMaxSize) {
+ timeoutExecutor.remove(this);
+ dispatch();
+ }
+ }
+
+ public synchronized void dispatch() {
+ if (operations != null) {
+ remoteDispatch(getKey(), operations);
+ this.operations = null;
+ }
+ }
+
+ public synchronized void abortOperationsInQueue() {
+ if (operations != null) {
+ abortPendingOperations(getKey(), operations);
+ this.operations = null;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return super.toString() + ", operations=" + this.operations;
+ }
+ }
+
+ /**
+ * Delayed object that holds a FutureTask.
+ * used to submit something later to the thread-pool.
+ */
+ private static final class DelayedTask extends DelayedContainerWithTimestamp<FutureTask<Void>> {
+ public DelayedTask(final FutureTask<Void> task, final long delay, final TimeUnit unit) {
+ super(task, EnvironmentEdgeManager.currentTime() + unit.toMillis(delay));
+ }
+ };
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java
index 1a84070..64bb278 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java
@@ -27,12 +27,13 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.SequentialProcedureData;
/**
- * A SequentialProcedure describes one step in a procedure chain.
+ * A SequentialProcedure describes one step in a procedure chain:
+ * <pre>
* -> Step 1 -> Step 2 -> Step 3
- *
+ * </pre>
* The main difference from a base Procedure is that the execute() of a
- * SequentialProcedure will be called only once, there will be no second
- * execute() call once the child are finished. which means once the child
+ * SequentialProcedure will be called only once; there will be no second
+ * execute() call once the children are finished. which means once the child
* of a SequentialProcedure are completed the SequentialProcedure is completed too.
*/
@InterfaceAudience.Private
http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java
index 0590a93..becd9b7 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java
@@ -21,9 +21,10 @@ package org.apache.hadoop.hbase.procedure2;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -34,7 +35,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.StateMa
/**
* Procedure described by a series of steps.
*
- * The procedure implementor must have an enum of 'states', describing
+ * <p>The procedure implementor must have an enum of 'states', describing
* the various step of the procedure.
* Once the procedure is running, the procedure-framework will call executeFromState()
* using the 'state' provided by the user. The first call to executeFromState()
@@ -56,7 +57,7 @@ public abstract class StateMachineProcedure<TEnvironment, TState>
private int stateCount = 0;
private int[] states = null;
- private ArrayList<Procedure> subProcList = null;
+ private List<Procedure<TEnvironment>> subProcList = null;
protected enum Flow {
HAS_MORE_STATE,
@@ -70,7 +71,7 @@ public abstract class StateMachineProcedure<TEnvironment, TState>
* Flow.HAS_MORE_STATE if there is another step.
*/
protected abstract Flow executeFromState(TEnvironment env, TState state)
- throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException;
+ throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException;
/**
* called to perform the rollback of the specified state
@@ -125,12 +126,15 @@ public abstract class StateMachineProcedure<TEnvironment, TState>
* Add a child procedure to execute
* @param subProcedure the child procedure
*/
- protected void addChildProcedure(Procedure... subProcedure) {
+ protected void addChildProcedure(Procedure<TEnvironment>... subProcedure) {
+ if (subProcedure == null) return;
+ final int len = subProcedure.length;
+ if (len == 0) return;
if (subProcList == null) {
- subProcList = new ArrayList<>(subProcedure.length);
+ subProcList = new ArrayList<>(len);
}
- for (int i = 0; i < subProcedure.length; ++i) {
- Procedure proc = subProcedure[i];
+ for (int i = 0; i < len; ++i) {
+ Procedure<TEnvironment> proc = subProcedure[i];
if (!proc.hasOwner()) proc.setOwner(getOwner());
subProcList.add(proc);
}
@@ -138,27 +142,23 @@ public abstract class StateMachineProcedure<TEnvironment, TState>
@Override
protected Procedure[] execute(final TEnvironment env)
- throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
+ throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
updateTimestamp();
try {
failIfAborted();
if (!hasMoreState() || isFailed()) return null;
-
TState state = getCurrentState();
if (stateCount == 0) {
setNextState(getStateId(state));
}
-
stateFlow = executeFromState(env, state);
if (!hasMoreState()) setNextState(EOF_STATE);
-
- if (subProcList != null && subProcList.size() != 0) {
+ if (subProcList != null && !subProcList.isEmpty()) {
Procedure[] subProcedures = subProcList.toArray(new Procedure[subProcList.size()]);
subProcList = null;
return subProcedures;
}
-
return (isWaiting() || isFailed() || !hasMoreState()) ? null : new Procedure[] {this};
} finally {
updateTimestamp();
http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java
index c03e326..9e53f42 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java
@@ -52,8 +52,8 @@ public class NoopProcedureStore extends ProcedureStoreBase {
}
@Override
- public void setRunningProcedureCount(final int count) {
- // no-op
+ public int setRunningProcedureCount(final int count) {
+ return count;
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
index 385cedb..a690c81 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
@@ -153,8 +153,9 @@ public interface ProcedureStore {
/**
* Set the number of procedure running.
* This can be used, for example, by the store to know how long to wait before a sync.
+ * @return how many procedures are running (may not be same as <code>count</code>).
*/
- void setRunningProcedureCount(int count);
+ int setRunningProcedureCount(int count);
/**
* Acquire the lease for the procedure store.
http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java
index 012ddeb..95a1ef6 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java
@@ -155,9 +155,23 @@ public class ProcedureWALFile implements Comparable<ProcedureWALFile> {
this.logSize += size;
}
- public void removeFile() throws IOException {
+ public void removeFile(final Path walArchiveDir) throws IOException {
close();
- fs.delete(logFile, false);
+ boolean archived = false;
+ if (walArchiveDir != null) {
+ Path archivedFile = new Path(walArchiveDir, logFile.getName());
+ LOG.info("ARCHIVED (TODO: FILES ARE NOT PURGED FROM ARCHIVE!) " + logFile + " to " + archivedFile);
+ if (!fs.rename(logFile, archivedFile)) {
+ LOG.warn("Failed archive of " + logFile + ", deleting");
+ } else {
+ archived = true;
+ }
+ }
+ if (!archived) {
+ if (!fs.delete(logFile, false)) {
+ LOG.warn("Failed delete of " + logFile);
+ }
+ }
}
public void setProcIds(long minId, long maxId) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
index c672045..0a05e6e 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
@@ -83,11 +83,11 @@ public class ProcedureWALFormatReader {
//
// Fast Start: INIT/INSERT record and StackIDs
// ---------------------------------------------
- // We have two special record, INIT and INSERT that tracks the first time
- // the procedure was added to the WAL. We can use that information to be able
- // to start procedures before reaching the end of the WAL, or before reading all the WALs.
- // but in some cases the WAL with that record can be already gone.
- // In alternative we can use the stackIds on each procedure,
+ // We have two special records, INIT and INSERT, that track the first time
+ // the procedure was added to the WAL. We can use this information to be able
+ // to start procedures before reaching the end of the WAL, or before reading all WALs.
+ // But in some cases, the WAL with that record can be already gone.
+ // As an alternative, we can use the stackIds on each procedure,
// to identify when a procedure is ready to start.
// If there are gaps in the sum of the stackIds we need to read more WALs.
//
@@ -107,16 +107,16 @@ public class ProcedureWALFormatReader {
* Global tracker that will be used by the WALProcedureStore after load.
* If the last WAL was closed cleanly we already have a full tracker ready to be used.
* If the last WAL was truncated (e.g. master killed) the tracker will be empty
- * and the 'partial' flag will be set. In this case on WAL replay we are going
+ * and the 'partial' flag will be set. In this case, on WAL replay we are going
* to rebuild the tracker.
*/
private final ProcedureStoreTracker tracker;
- // private final boolean hasFastStartSupport;
+ // TODO: private final boolean hasFastStartSupport;
/**
* If tracker for a log file is partial (see {@link ProcedureStoreTracker#partial}), we
* re-build the list of procedures updated in that WAL because we need it for log cleaning
- * purpose. If all procedures updated in a WAL are found to be obsolete, it can be safely deleted.
+ * purposes. If all procedures updated in a WAL are found to be obsolete, it can be safely deleted.
* (see {@link WALProcedureStore#removeInactiveLogs()}).
* However, we don't need deleted part of a WAL's tracker for this purpose, so we don't bother
* re-building it.
@@ -137,7 +137,7 @@ public class ProcedureWALFormatReader {
public void read(final ProcedureWALFile log) throws IOException {
localTracker = log.getTracker().isPartial() ? log.getTracker() : null;
if (localTracker != null) {
- LOG.info("Rebuilding tracker for log - " + log);
+ LOG.info("Rebuilding tracker for " + log);
}
FSDataInputStream stream = log.getStream();
@@ -146,7 +146,7 @@ public class ProcedureWALFormatReader {
while (hasMore) {
ProcedureWALEntry entry = ProcedureWALFormat.readEntry(stream);
if (entry == null) {
- LOG.warn("nothing left to decode. exiting with missing EOF");
+ LOG.warn("Nothing left to decode. Exiting with missing EOF, log=" + log);
break;
}
switch (entry.getType()) {
@@ -171,7 +171,7 @@ public class ProcedureWALFormatReader {
}
}
} catch (InvalidProtocolBufferException e) {
- LOG.error("got an exception while reading the procedure WAL: " + log, e);
+ LOG.error("While reading procedure from " + log, e);
loader.markCorruptedWAL(log, e);
}
@@ -211,7 +211,7 @@ public class ProcedureWALFormatReader {
maxProcId = Math.max(maxProcId, proc.getProcId());
if (isRequired(proc.getProcId())) {
if (LOG.isTraceEnabled()) {
- LOG.trace("read " + entry.getType() + " entry " + proc.getProcId());
+ LOG.trace("Read " + entry.getType() + " entry " + proc.getProcId());
}
localProcedureMap.add(proc);
if (tracker.isPartial()) {
@@ -296,7 +296,7 @@ public class ProcedureWALFormatReader {
// replayOrderHead = C <-> B <-> E <-> D <-> A <-> G
//
// We also have a lazy grouping by "root procedure", and a list of
- // unlinked procedure. If after reading all the WALs we have unlinked
+ // unlinked procedures. If after reading all the WALs we have unlinked
// procedures it means that we had a missing WAL or a corruption.
// rootHead = A <-> D <-> G
// B E
@@ -639,17 +639,17 @@ public class ProcedureWALFormatReader {
* "ready" means that we all the information that we need in-memory.
*
* Example-1:
- * We have two WALs, we start reading fronm the newest (wal-2)
+ * We have two WALs, we start reading from the newest (wal-2)
* wal-2 | C B |
* wal-1 | A B C |
*
* If C and B don't depend on A (A is not the parent), we can start them
- * before reading wal-1. If B is the only one with parent A we can start C
- * and read one more WAL before being able to start B.
+ * before reading wal-1. If B is the only one with parent A we can start C.
+ * We have to read one more WAL before being able to start B.
*
* How do we know with the only information in B that we are not ready.
* - easy case, the parent is missing from the global map
- * - more complex case we look at the Stack IDs
+ * - more complex case we look at the Stack IDs.
*
* The Stack-IDs are added to the procedure order as incremental index
* tracking how many times that procedure was executed, which is equivalent
@@ -664,7 +664,7 @@ public class ProcedureWALFormatReader {
* executed before.
* To identify when a Procedure is ready we do the sum of the stackIds of
* the procedure and the parent. if the stackIdSum is equals to the
- * sum of {1..maxStackId} then everything we need is avaiable.
+ * sum of {1..maxStackId} then everything we need is available.
*
* Example-2
* wal-2 | A | A stackIds = [0, 2]
@@ -676,7 +676,7 @@ public class ProcedureWALFormatReader {
assert !rootEntry.hasParent() : "expected root procedure, got " + rootEntry;
if (rootEntry.isFinished()) {
- // if the root procedure is finished, sub-procedures should be gone
+ // If the root procedure is finished, sub-procedures should be gone
if (rootEntry.childHead != null) {
LOG.error("unexpected active children for root-procedure: " + rootEntry);
for (Entry p = rootEntry.childHead; p != null; p = p.linkNext) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
index 4712c30..1791cae 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
@@ -66,6 +66,7 @@ import com.google.common.annotations.VisibleForTesting;
@InterfaceStability.Evolving
public class WALProcedureStore extends ProcedureStoreBase {
private static final Log LOG = LogFactory.getLog(WALProcedureStore.class);
+ public static final String LOG_PREFIX = "pv2-";
public interface LeaseRecovery {
void recoverFileLease(FileSystem fs, Path path) throws IOException;
@@ -124,6 +125,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
private final Configuration conf;
private final FileSystem fs;
private final Path walDir;
+ private final Path walArchiveDir;
private final AtomicReference<Throwable> syncException = new AtomicReference<>();
private final AtomicBoolean loading = new AtomicBoolean(true);
@@ -185,9 +187,15 @@ public class WALProcedureStore extends ProcedureStoreBase {
public WALProcedureStore(final Configuration conf, final FileSystem fs, final Path walDir,
final LeaseRecovery leaseRecovery) {
+ this(conf, fs, walDir, null, leaseRecovery);
+ }
+
+ public WALProcedureStore(final Configuration conf, final FileSystem fs, final Path walDir,
+ final Path walArchiveDir, final LeaseRecovery leaseRecovery) {
this.fs = fs;
this.conf = conf;
this.walDir = walDir;
+ this.walArchiveDir = walArchiveDir;
this.leaseRecovery = leaseRecovery;
}
@@ -239,6 +247,16 @@ public class WALProcedureStore extends ProcedureStoreBase {
}
};
syncThread.start();
+
+ // Create archive dir up front. Rename won't work w/o it up on HDFS.
+ if (this.walArchiveDir != null && !this.fs.exists(this.walArchiveDir)) {
+ if (this.fs.mkdirs(this.walArchiveDir)) {
+ if (LOG.isDebugEnabled()) LOG.debug("Created Procedure Store WAL archive dir " +
+ this.walArchiveDir);
+ } else {
+ LOG.warn("Failed create of " + this.walArchiveDir);
+ }
+ }
}
@Override
@@ -292,9 +310,9 @@ public class WALProcedureStore extends ProcedureStoreBase {
}
@Override
- public void setRunningProcedureCount(final int count) {
- LOG.debug("Set running procedure count=" + count + ", slots=" + slots.length);
+ public int setRunningProcedureCount(final int count) {
this.runningProcCount = count > 0 ? Math.min(count, slots.length) : slots.length;
+ return this.runningProcCount;
}
public ProcedureStoreTracker getStoreTracker() {
@@ -343,7 +361,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
if (LOG.isDebugEnabled()) {
LOG.debug("Someone else created new logs. Expected maxLogId < " + flushLogId);
}
- logs.getLast().removeFile();
+ logs.getLast().removeFile(this.walArchiveDir);
continue;
}
@@ -955,7 +973,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
// but we should check if someone else has created new files
if (getMaxLogId(getLogFiles()) > flushLogId) {
LOG.warn("Someone else created new logs. Expected maxLogId < " + flushLogId);
- logs.getLast().removeFile();
+ logs.getLast().removeFile(this.walArchiveDir);
return false;
}
@@ -1047,7 +1065,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
// We keep track of which procedures are holding the oldest WAL in 'holdingCleanupTracker'.
// once there is nothing olding the oldest WAL we can remove it.
while (logs.size() > 1 && holdingCleanupTracker.isEmpty()) {
- removeLogFile(logs.getFirst());
+ removeLogFile(logs.getFirst(), walArchiveDir);
buildHoldingCleanupTracker();
}
@@ -1079,8 +1097,8 @@ public class WALProcedureStore extends ProcedureStoreBase {
private void removeAllLogs(long lastLogId) {
if (logs.size() <= 1) return;
- if (LOG.isDebugEnabled()) {
- LOG.debug("Remove all state logs with ID less than " + lastLogId);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Remove all state logs with ID less than " + lastLogId);
}
boolean removed = false;
@@ -1089,7 +1107,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
if (lastLogId < log.getLogId()) {
break;
}
- removeLogFile(log);
+ removeLogFile(log, walArchiveDir);
removed = true;
}
@@ -1098,15 +1116,15 @@ public class WALProcedureStore extends ProcedureStoreBase {
}
}
- private boolean removeLogFile(final ProcedureWALFile log) {
+ private boolean removeLogFile(final ProcedureWALFile log, final Path walArchiveDir) {
try {
if (LOG.isTraceEnabled()) {
LOG.trace("Removing log=" + log);
}
- log.removeFile();
+ log.removeFile(walArchiveDir);
logs.remove(log);
if (LOG.isDebugEnabled()) {
- LOG.info("Removed log=" + log + " activeLogs=" + logs);
+ LOG.info("Removed log=" + log + ", activeLogs=" + logs);
}
assert logs.size() > 0 : "expected at least one log";
} catch (IOException e) {
@@ -1128,7 +1146,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
}
protected Path getLogFilePath(final long logId) throws IOException {
- return new Path(walDir, String.format("state-%020d.log", logId));
+ return new Path(walDir, String.format(LOG_PREFIX + "%020d.log", logId));
}
private static long getLogIdFromName(final String name) {
@@ -1141,7 +1159,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
@Override
public boolean accept(Path path) {
String name = path.getName();
- return name.startsWith("state-") && name.endsWith(".log");
+ return name.startsWith(LOG_PREFIX) && name.endsWith(".log");
}
};
@@ -1192,7 +1210,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
}
maxLogId = Math.max(maxLogId, getLogIdFromName(logPath.getName()));
- ProcedureWALFile log = initOldLog(logFiles[i]);
+ ProcedureWALFile log = initOldLog(logFiles[i], this.walArchiveDir);
if (log != null) {
this.logs.add(log);
}
@@ -1222,21 +1240,22 @@ public class WALProcedureStore extends ProcedureStoreBase {
/**
* Loads given log file and it's tracker.
*/
- private ProcedureWALFile initOldLog(final FileStatus logFile) throws IOException {
+ private ProcedureWALFile initOldLog(final FileStatus logFile, final Path walArchiveDir)
+ throws IOException {
final ProcedureWALFile log = new ProcedureWALFile(fs, logFile);
if (logFile.getLen() == 0) {
LOG.warn("Remove uninitialized log: " + logFile);
- log.removeFile();
+ log.removeFile(walArchiveDir);
return null;
}
if (LOG.isDebugEnabled()) {
- LOG.debug("Opening state-log: " + logFile);
+ LOG.debug("Opening Pv2 " + logFile);
}
try {
log.open();
} catch (ProcedureWALFormat.InvalidWALDataException e) {
LOG.warn("Remove uninitialized log: " + logFile, e);
- log.removeFile();
+ log.removeFile(walArchiveDir);
return null;
} catch (IOException e) {
String msg = "Unable to read state log: " + logFile;
http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/DelayedUtil.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/DelayedUtil.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/DelayedUtil.java
index cde37bd..faf8e7e 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/DelayedUtil.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/DelayedUtil.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+// FIX namings. TODO.
@InterfaceAudience.Private
@InterfaceStability.Evolving
public final class DelayedUtil {
@@ -148,6 +149,9 @@ public final class DelayedUtil {
}
}
+ /**
+ * Has a timeout.
+ */
public static class DelayedContainerWithTimestamp<T> extends DelayedContainer<T> {
private long timeout;
@@ -165,4 +169,4 @@ public final class DelayedUtil {
this.timeout = timeout;
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureToString.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureToString.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureToString.java
index 408cffd..78daf5a 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureToString.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureToString.java
@@ -42,7 +42,7 @@ public class TestProcedureToString {
*/
static class BasicProcedure extends Procedure<BasicProcedureEnv> {
@Override
- protected Procedure<?>[] execute(BasicProcedureEnv env)
+ protected Procedure<BasicProcedureEnv>[] execute(BasicProcedureEnv env)
throws ProcedureYieldException, InterruptedException {
return new Procedure [] {this};
}
@@ -78,8 +78,6 @@ public class TestProcedureToString {
}
}
-
-
/**
* Test that I can override the toString for its state value.
* @throws ProcedureYieldException