You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2017/01/14 05:07:27 UTC
[4/4] hbase git commit: HBASE-16744 Procedure V2 - Lock procedures to
allow clients to acquire locks on tables/namespaces/regions (Matteo Bertozzi)
HBASE-16744 Procedure V2 - Lock procedures to allow clients to acquire
locks on tables/namespaces/regions (Matteo Bertozzi)
Incorporates review comments from
https://reviews.apache.org/r/52589/
https://reviews.apache.org/r/54388/
M hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
Fix for eclipse complaint (from Duo Zhang)
M hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
M hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
M hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
Log formatting
M hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
Added wait procedures utility.
A hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/LockServiceProtos.java
A hbase-protocol-shaded/src/main/protobuf/LockService.proto b/hbase-protocol-shaded/src/main/protobuf/LockService.proto
Implement new locking CP overrides.
A hbase-server/src/main/java/org/apache/hadoop/hbase/client/locking/EntityLock.java
New hbase entity lock (ns, table, or regions)
A hbase-server/src/main/java/org/apache/hadoop/hbase/client/locking/LockServiceClient.java
Client that can use the new internal locking service.
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/4cb09a49
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/4cb09a49
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/4cb09a49
Branch: refs/heads/master
Commit: 4cb09a494c4148de2b4e8c6cd011bacdf7f33b1a
Parents: 9fd5dab1
Author: Michael Stack <st...@apache.org>
Authored: Wed Jan 11 14:38:59 2017 -0800
Committer: Michael Stack <st...@apache.org>
Committed: Fri Jan 13 21:07:03 2017 -0800
----------------------------------------------------------------------
.../hadoop/hbase/client/AsyncTableBase.java | 4 +-
.../hadoop/hbase/procedure2/Procedure.java | 10 +-
.../hbase/procedure2/ProcedureExecutor.java | 98 +-
.../procedure2/store/wal/WALProcedureStore.java | 8 +-
.../procedure2/ProcedureTestingUtility.java | 12 +
.../protobuf/generated/LockServiceProtos.java | 5328 ++++++++++++++++++
.../src/main/protobuf/LockService.proto | 79 +
.../hbase/rsgroup/RSGroupAdminEndpoint.java | 54 +-
.../hadoop/hbase/client/locking/EntityLock.java | 266 +
.../hbase/client/locking/LockServiceClient.java | 111 +
.../BaseMasterAndRegionObserver.java | 23 +
.../hbase/coprocessor/BaseMasterObserver.java | 25 +-
.../hbase/coprocessor/MasterObserver.java | 36 +-
.../org/apache/hadoop/hbase/master/HMaster.java | 9 +-
.../hbase/master/MasterCoprocessorHost.java | 43 +
.../hadoop/hbase/master/MasterRpcServices.java | 68 +-
.../hadoop/hbase/master/MasterServices.java | 5 +
.../hbase/master/locking/LockManager.java | 271 +
.../hbase/master/locking/LockProcedure.java | 462 ++
.../org/apache/hadoop/hbase/util/IdLock.java | 12 +-
.../hbase/client/locking/TestEntityLocks.java | 182 +
.../hbase/coprocessor/TestMasterObserver.java | 64 +-
.../hbase/master/MockNoopMasterServices.java | 8 +-
.../hadoop/hbase/master/MockRegionServer.java | 1 +
.../hbase/master/locking/TestLockManager.java | 161 +
.../hbase/master/locking/TestLockProcedure.java | 456 ++
26 files changed, 7709 insertions(+), 87 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/4cb09a49/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
index 19a22c0..d80627f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
@@ -386,8 +386,8 @@ public interface AsyncTableBase {
* @return A list of {@link CompletableFuture}s that represent the existence for each get.
*/
default List<CompletableFuture<Boolean>> exists(List<Get> gets) {
- return get(toCheckExistenceOnly(gets)).stream().map(f -> f.thenApply(r -> r.getExists()))
- .collect(toList());
+ return get(toCheckExistenceOnly(gets)).stream().
+ <CompletableFuture<Boolean>>map(f -> f.thenApply(r -> r.getExists())).collect(toList());
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/4cb09a49/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
index cb4ee47..3f3cf33 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
@@ -243,24 +243,24 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
final StringBuilder sb = new StringBuilder();
toStringClassDetails(sb);
- sb.append(" id=");
+ sb.append(", procId=");
sb.append(getProcId());
if (hasParent()) {
- sb.append(" parent=");
+ sb.append(", parent=");
sb.append(getParentProcId());
}
if (hasOwner()) {
- sb.append(" owner=");
+ sb.append(", owner=");
sb.append(getOwner());
}
- sb.append(" state=");
+ sb.append(", state=");
toStringState(sb);
if (hasException()) {
- sb.append(" failed=" + getException());
+ sb.append(", failed=" + getException());
}
return sb;
http://git-wip-us.apache.org/repos/asf/hbase/blob/4cb09a49/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
index c65f3fb..d3b65e8 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
@@ -28,6 +28,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -91,7 +92,7 @@ public class ProcedureExecutor<TEnvironment> {
final boolean kill = this.killBeforeStoreUpdate;
if (this.toggleKillBeforeStoreUpdate) {
this.killBeforeStoreUpdate = !kill;
- LOG.warn("Toggle Kill before store update to: " + this.killBeforeStoreUpdate);
+ LOG.warn("Toggle KILL before store update to: " + this.killBeforeStoreUpdate);
}
return kill;
}
@@ -172,7 +173,7 @@ public class ProcedureExecutor<TEnvironment> {
final long now = EnvironmentEdgeManager.currentTime();
final Iterator<Map.Entry<Long, ProcedureInfo>> it = completed.entrySet().iterator();
- final boolean isDebugEnabled = LOG.isDebugEnabled();
+ final boolean debugEnabled = LOG.isDebugEnabled();
while (it.hasNext() && store.isRunning()) {
final Map.Entry<Long, ProcedureInfo> entry = it.next();
final ProcedureInfo procInfo = entry.getValue();
@@ -180,8 +181,8 @@ public class ProcedureExecutor<TEnvironment> {
// TODO: Select TTL based on Procedure type
if ((procInfo.hasClientAckTime() && (now - procInfo.getClientAckTime()) >= evictAckTtl) ||
(now - procInfo.getLastUpdate()) >= evictTtl) {
- if (isDebugEnabled) {
- LOG.debug("Evict completed procedure: " + procInfo);
+ if (debugEnabled) {
+ LOG.debug("Evict completed " + procInfo);
}
batchIds[batchCount++] = entry.getKey();
if (batchCount == batchIds.length) {
@@ -281,7 +282,7 @@ public class ProcedureExecutor<TEnvironment> {
@Override
public void setMaxProcId(long maxProcId) {
assert lastProcId.get() < 0 : "expected only one call to setMaxProcId()";
- LOG.debug("load procedures maxProcId=" + maxProcId);
+ LOG.debug("Load maxProcId=" + maxProcId);
lastProcId.set(maxProcId);
}
@@ -295,7 +296,7 @@ public class ProcedureExecutor<TEnvironment> {
int corruptedCount = 0;
while (procIter.hasNext()) {
ProcedureInfo proc = procIter.nextAsProcedureInfo();
- LOG.error("corrupted procedure: " + proc);
+ LOG.error("Corrupt " + proc);
corruptedCount++;
}
if (abortOnCorruption && corruptedCount > 0) {
@@ -307,7 +308,7 @@ public class ProcedureExecutor<TEnvironment> {
private void loadProcedures(final ProcedureIterator procIter,
final boolean abortOnCorruption) throws IOException {
- final boolean isDebugEnabled = LOG.isDebugEnabled();
+ final boolean debugEnabled = LOG.isDebugEnabled();
// 1. Build the rollback stack
int runnablesCount = 0;
@@ -320,8 +321,8 @@ public class ProcedureExecutor<TEnvironment> {
nonceKey = proc.getNonceKey();
procId = proc.getProcId();
completed.put(proc.getProcId(), proc);
- if (isDebugEnabled) {
- LOG.debug("The procedure is completed: " + proc);
+ if (debugEnabled) {
+ LOG.debug("Completed " + proc);
}
} else {
Procedure proc = procIter.nextAsProcedure();
@@ -361,8 +362,8 @@ public class ProcedureExecutor<TEnvironment> {
Procedure proc = procIter.nextAsProcedure();
assert !(proc.isFinished() && !proc.hasParent()) : "unexpected completed proc=" + proc;
- if (isDebugEnabled) {
- LOG.debug(String.format("Loading procedure state=%s isFailed=%s: %s",
+ if (debugEnabled) {
+ LOG.debug(String.format("Loading state=%s isFailed=%s: %s",
proc.getState(), proc.hasException(), proc));
}
@@ -425,7 +426,7 @@ public class ProcedureExecutor<TEnvironment> {
if (procStack.isValid()) continue;
for (Procedure proc: procStack.getSubproceduresStack()) {
- LOG.error("corrupted procedure: " + proc);
+ LOG.error("Corrupted " + proc);
procedures.remove(proc.getProcId());
runnableList.remove(proc);
if (waitingSet != null) waitingSet.remove(proc);
@@ -485,7 +486,7 @@ public class ProcedureExecutor<TEnvironment> {
// We have numThreads executor + one timer thread used for timing out
// procedures and triggering periodic procedures.
this.corePoolSize = numThreads;
- LOG.info("Starting procedure executor threads=" + corePoolSize);
+ LOG.info("Starting executor threads=" + corePoolSize);
// Create the Thread Group for the executors
threadGroup = new ThreadGroup("ProcedureExecutor");
@@ -506,7 +507,7 @@ public class ProcedureExecutor<TEnvironment> {
st = EnvironmentEdgeManager.currentTime();
store.recoverLease();
et = EnvironmentEdgeManager.currentTime();
- LOG.info(String.format("recover procedure store (%s) lease: %s",
+ LOG.info(String.format("Recover store (%s) lease: %s",
store.getClass().getSimpleName(), StringUtils.humanTimeDiff(et - st)));
// start the procedure scheduler
@@ -520,11 +521,11 @@ public class ProcedureExecutor<TEnvironment> {
st = EnvironmentEdgeManager.currentTime();
load(abortOnCorruption);
et = EnvironmentEdgeManager.currentTime();
- LOG.info(String.format("load procedure store (%s): %s",
+ LOG.info(String.format("Load store (%s): %s",
store.getClass().getSimpleName(), StringUtils.humanTimeDiff(et - st)));
// Start the executors. Here we must have the lastProcId set.
- LOG.debug("start workers " + workerThreads.size());
+ LOG.debug("Start workers " + workerThreads.size());
timeoutExecutor.start();
for (WorkerThread worker: workerThreads) {
worker.start();
@@ -542,7 +543,7 @@ public class ProcedureExecutor<TEnvironment> {
return;
}
- LOG.info("Stopping the procedure executor");
+ LOG.info("Stopping");
scheduler.stop();
timeoutExecutor.sendStopSignal();
}
@@ -564,7 +565,7 @@ public class ProcedureExecutor<TEnvironment> {
try {
threadGroup.destroy();
} catch (IllegalThreadStateException e) {
- LOG.error("thread group " + threadGroup + " contains running threads");
+ LOG.error("Thread group " + threadGroup + " contains running threads");
threadGroup.list();
} finally {
threadGroup = null;
@@ -693,12 +694,12 @@ public class ProcedureExecutor<TEnvironment> {
// we found a registered nonce, but the procedure may not have been submitted yet.
// since the client expect the procedure to be submitted, spin here until it is.
- final boolean isTraceEnabled = LOG.isTraceEnabled();
+ final boolean traceEnabled = LOG.isTraceEnabled();
while (isRunning() &&
!(procedures.containsKey(oldProcId) || completed.containsKey(oldProcId)) &&
nonceKeysToProcIdsMap.containsKey(nonceKey)) {
- if (isTraceEnabled) {
- LOG.trace("waiting for procId=" + oldProcId.longValue() + " to be submitted");
+ if (traceEnabled) {
+ LOG.trace("Waiting for procId=" + oldProcId.longValue() + " to be submitted");
}
Threads.sleep(100);
}
@@ -787,7 +788,7 @@ public class ProcedureExecutor<TEnvironment> {
// Commit the transaction
store.insert(proc, null);
if (LOG.isDebugEnabled()) {
- LOG.debug("Procedure " + proc + " added to the store.");
+ LOG.debug("Stored " + proc);
}
// Add the procedure to the executor
@@ -811,7 +812,7 @@ public class ProcedureExecutor<TEnvironment> {
// Commit the transaction
store.insert(procs);
if (LOG.isDebugEnabled()) {
- LOG.debug("Procedures added to the store: " + Arrays.toString(procs));
+ LOG.debug("Stored " + Arrays.toString(procs));
}
// Add the procedure to the executor
@@ -880,6 +881,14 @@ public class ProcedureExecutor<TEnvironment> {
return procedures.get(procId);
}
+ public <T extends Procedure> T getProcedure(final Class<T> clazz, final long procId) {
+ final Procedure proc = getProcedure(procId);
+ if (clazz.isInstance(proc)) {
+ return (T)proc;
+ }
+ return null;
+ }
+
public ProcedureInfo getResult(final long procId) {
return completed.get(procId);
}
@@ -917,7 +926,7 @@ public class ProcedureExecutor<TEnvironment> {
if (result == null) {
assert !procedures.containsKey(procId) : "procId=" + procId + " is still running";
if (LOG.isDebugEnabled()) {
- LOG.debug("Procedure procId=" + procId + " already removed by the cleaner.");
+ LOG.debug("procId=" + procId + " already removed by the cleaner.");
}
return;
}
@@ -999,7 +1008,7 @@ public class ProcedureExecutor<TEnvironment> {
try {
listener.procedureLoaded(procId);
} catch (Throwable e) {
- LOG.error("The listener " + listener + " had an error: " + e.getMessage(), e);
+ LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e);
}
}
}
@@ -1011,7 +1020,7 @@ public class ProcedureExecutor<TEnvironment> {
try {
listener.procedureAdded(procId);
} catch (Throwable e) {
- LOG.error("The listener " + listener + " had an error: " + e.getMessage(), e);
+ LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e);
}
}
}
@@ -1023,7 +1032,7 @@ public class ProcedureExecutor<TEnvironment> {
try {
listener.procedureFinished(procId);
} catch (Throwable e) {
- LOG.error("The listener " + listener + " had an error: " + e.getMessage(), e);
+ LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e);
}
}
}
@@ -1053,6 +1062,11 @@ public class ProcedureExecutor<TEnvironment> {
return lastProcId.get();
}
+ @VisibleForTesting
+ public Set<Long> getActiveProcIds() {
+ return procedures.keySet();
+ }
+
private Long getRootProcedureId(Procedure proc) {
return Procedure.getRootProcedureId(procedures, proc);
}
@@ -1111,7 +1125,7 @@ public class ProcedureExecutor<TEnvironment> {
if (proc.isSuccess()) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Procedure completed in " +
+ LOG.debug("Completed in " +
StringUtils.humanTimeDiff(proc.elapsedTime()) + ": " + proc);
}
// Finalize the procedure state
@@ -1203,7 +1217,7 @@ public class ProcedureExecutor<TEnvironment> {
}
// Finalize the procedure state
- LOG.info("Rolledback procedure " + rootProc +
+ LOG.info("Rolled back " + rootProc +
" exec-time=" + StringUtils.humanTimeDiff(rootProc.elapsedTime()) +
" exception=" + exception.getMessage());
procedureFinished(rootProc);
@@ -1220,7 +1234,7 @@ public class ProcedureExecutor<TEnvironment> {
proc.doRollback(getEnvironment());
} catch (IOException e) {
if (LOG.isDebugEnabled()) {
- LOG.debug("rollback attempt failed for " + proc, e);
+ LOG.debug("Roll back attempt failed for " + proc, e);
}
return false;
} catch (InterruptedException e) {
@@ -1294,7 +1308,7 @@ public class ProcedureExecutor<TEnvironment> {
isSuspended = true;
} catch (ProcedureYieldException e) {
if (LOG.isTraceEnabled()) {
- LOG.trace("Yield procedure: " + procedure + ": " + e.getMessage());
+ LOG.trace("Yield " + procedure + ": " + e.getMessage());
}
scheduler.yield(procedure);
return;
@@ -1418,8 +1432,8 @@ public class ProcedureExecutor<TEnvironment> {
}
// If this procedure is the last child awake the parent procedure
- final boolean isTraceEnabled = LOG.isTraceEnabled();
- if (isTraceEnabled) {
+ final boolean traceEnabled = LOG.isTraceEnabled();
+ if (traceEnabled) {
LOG.trace(parent + " child is done: " + procedure);
}
@@ -1427,7 +1441,7 @@ public class ProcedureExecutor<TEnvironment> {
parent.setState(ProcedureState.RUNNABLE);
store.update(parent);
scheduler.addFront(parent);
- if (isTraceEnabled) {
+ if (traceEnabled) {
LOG.trace(parent + " all the children finished their work, resume.");
}
return;
@@ -1438,7 +1452,7 @@ public class ProcedureExecutor<TEnvironment> {
final Procedure procedure, final Procedure[] subprocs) {
if (subprocs != null && !procedure.isFailed()) {
if (LOG.isTraceEnabled()) {
- LOG.trace("Store add " + procedure + " children " + Arrays.toString(subprocs));
+ LOG.trace("Stored " + procedure + ", children " + Arrays.toString(subprocs));
}
store.insert(procedure, subprocs);
} else {
@@ -1464,7 +1478,7 @@ public class ProcedureExecutor<TEnvironment> {
private void handleInterruptedException(final Procedure proc, final InterruptedException e) {
if (LOG.isTraceEnabled()) {
- LOG.trace("got an interrupt during " + proc + ". suspend and retry it later.", e);
+ LOG.trace("Interrupt during " + proc + ". suspend and retry it later.", e);
}
// NOTE: We don't call Thread.currentThread().interrupt()
@@ -1530,7 +1544,7 @@ public class ProcedureExecutor<TEnvironment> {
@Override
public void run() {
- final boolean isTraceEnabled = LOG.isTraceEnabled();
+ final boolean traceEnabled = LOG.isTraceEnabled();
long lastUpdate = EnvironmentEdgeManager.currentTime();
while (isRunning() && keepAlive(lastUpdate)) {
final Procedure procedure = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS);
@@ -1539,7 +1553,7 @@ public class ProcedureExecutor<TEnvironment> {
store.setRunningProcedureCount(activeExecutorCount.incrementAndGet());
executionStartTime.set(EnvironmentEdgeManager.currentTime());
try {
- if (isTraceEnabled) {
+ if (traceEnabled) {
LOG.trace("Trying to start the execution of " + procedure);
}
executeProcedure(procedure);
@@ -1549,7 +1563,7 @@ public class ProcedureExecutor<TEnvironment> {
executionStartTime.set(Long.MAX_VALUE);
}
}
- LOG.debug("worker thread terminated " + this);
+ LOG.debug("Worker thread terminated " + this);
workerThreads.remove(this);
}
@@ -1691,7 +1705,7 @@ public class ProcedureExecutor<TEnvironment> {
sendStopSignal();
join(250);
if (i > 0 && (i % 8) == 0) {
- LOG.warn("waiting termination of thread " + getName() + ", " +
+ LOG.warn("Waiting termination of thread " + getName() + ", " +
StringUtils.humanTimeDiff(EnvironmentEdgeManager.currentTime() - startTime));
}
}
@@ -1767,7 +1781,7 @@ public class ProcedureExecutor<TEnvironment> {
// WARN the worker is stuck
stuckCount++;
- LOG.warn("found worker stuck " + worker +
+ LOG.warn("Worker stuck " + worker +
" run time " + StringUtils.humanTimeDiff(worker.getCurrentRunTime()));
}
return stuckCount;
@@ -1785,7 +1799,7 @@ public class ProcedureExecutor<TEnvironment> {
final WorkerThread worker = new WorkerThread(threadGroup);
workerThreads.add(worker);
worker.start();
- LOG.debug("added a new worker thread " + worker);
+ LOG.debug("Added new worker thread " + worker);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/4cb09a49/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
index 4465993..d4d5773 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
@@ -294,7 +294,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
@Override
public void setRunningProcedureCount(final int count) {
- LOG.debug("set running procedure count=" + count + " slots=" + slots.length);
+ LOG.debug("Set running procedure count=" + count + ", slots=" + slots.length);
this.runningProcCount = count > 0 ? Math.min(count, slots.length) : slots.length;
}
@@ -326,7 +326,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
try {
flushLogId = initOldLogs(oldLogs);
} catch (FileNotFoundException e) {
- LOG.warn("someone else is active and deleted logs. retrying.", e);
+ LOG.warn("Someone else is active and deleted logs. retrying.", e);
oldLogs = getLogFiles();
continue;
}
@@ -334,7 +334,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
// Create new state-log
if (!rollWriter(flushLogId + 1)) {
// someone else has already created this log
- LOG.debug("someone else has already created log " + flushLogId);
+ LOG.debug("Someone else has already created log " + flushLogId);
continue;
}
@@ -428,7 +428,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
try {
periodicRoll();
} catch (IOException e) {
- LOG.warn("unable to cleanup logs on load: " + e.getMessage(), e);
+ LOG.warn("Unable to cleanup logs on load: " + e.getMessage(), e);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/4cb09a49/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
index 9edc711..8aa2088 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
@@ -206,6 +206,18 @@ public class ProcedureTestingUtility {
}
}
+ public static <TEnv> void waitProcedures(ProcedureExecutor<TEnv> procExecutor, long... procIds) {
+ for (int i = 0; i < procIds.length; ++i) {
+ waitProcedure(procExecutor, procIds[i]);
+ }
+ }
+
+ public static <TEnv> void waitAllProcedures(ProcedureExecutor<TEnv> procExecutor) {
+ for (long procId : procExecutor.getActiveProcIds()) {
+ waitProcedure(procExecutor, procId);
+ }
+ }
+
public static <TEnv> void waitNoProcedureRunning(ProcedureExecutor<TEnv> procExecutor) {
int stableRuns = 0;
while (stableRuns < 10) {