You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2015/04/10 20:11:15 UTC
[05/25] hbase git commit: HBASE-13202 Procedure v2 - core framework
http://git-wip-us.apache.org/repos/asf/hbase/blob/04246c6c/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
new file mode 100644
index 0000000..13f7bfa
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
@@ -0,0 +1,721 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2.store.wal;
+
+import java.io.IOException;
+import java.io.FileNotFoundException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
+import org.apache.hadoop.hbase.procedure2.util.ByteSlot;
+import org.apache.hadoop.hbase.procedure2.util.StringUtils;
+import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
+
+/**
+ * WAL implementation of the ProcedureStore.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class WALProcedureStore implements ProcedureStore {
+ private static final Log LOG = LogFactory.getLog(WALProcedureStore.class);
+
+ public interface LeaseRecovery {
+ void recoverFileLease(FileSystem fs, Path path) throws IOException;
+ }
+
+ private static final int MAX_RETRIES_BEFORE_ABORT = 3;
+
+ private static final String SYNC_WAIT_MSEC_CONF_KEY = "hbase.procedure.store.wal.sync.wait.msec";
+ private static final int DEFAULT_SYNC_WAIT_MSEC = 100;
+
+ private final CopyOnWriteArrayList<ProcedureStoreListener> listeners =
+ new CopyOnWriteArrayList<ProcedureStoreListener>();
+
+ private final LinkedList<ProcedureWALFile> logs = new LinkedList<ProcedureWALFile>();
+ private final ProcedureStoreTracker storeTracker = new ProcedureStoreTracker();
+ private final AtomicBoolean running = new AtomicBoolean(false);
+ private final ReentrantLock lock = new ReentrantLock();
+ private final Condition waitCond = lock.newCondition();
+ private final Condition slotCond = lock.newCondition();
+ private final Condition syncCond = lock.newCondition();
+
+ private final LeaseRecovery leaseRecovery;
+ private final Configuration conf;
+ private final FileSystem fs;
+ private final Path logDir;
+
+ private AtomicBoolean inSync = new AtomicBoolean(false);
+ private ArrayBlockingQueue<ByteSlot> slotsCache = null;
+ private Set<ProcedureWALFile> corruptedLogs = null;
+ private FSDataOutputStream stream = null;
+ private long totalSynced = 0;
+ private long flushLogId = 0;
+ private int slotIndex = 0;
+ private Thread syncThread;
+ private ByteSlot[] slots;
+ private int syncWaitMsec;
+
+ public WALProcedureStore(final Configuration conf, final FileSystem fs, final Path logDir,
+ final LeaseRecovery leaseRecovery) {
+ this.fs = fs;
+ this.conf = conf;
+ this.logDir = logDir;
+ this.leaseRecovery = leaseRecovery;
+ }
+
+ @Override
+ public void start(int numSlots) throws IOException {
+ if (running.getAndSet(true)) {
+ return;
+ }
+
+ // Init buffer slots
+ slots = new ByteSlot[numSlots];
+ slotsCache = new ArrayBlockingQueue(numSlots, true);
+ while (slotsCache.remainingCapacity() > 0) {
+ slotsCache.offer(new ByteSlot());
+ }
+
+ // Tunings
+ syncWaitMsec = conf.getInt(SYNC_WAIT_MSEC_CONF_KEY, DEFAULT_SYNC_WAIT_MSEC);
+
+ // Init sync thread
+ syncThread = new Thread("WALProcedureStoreSyncThread") {
+ @Override
+ public void run() {
+ while (running.get()) {
+ try {
+ syncLoop();
+ } catch (IOException e) {
+ LOG.error("got an exception from the sync-loop", e);
+ sendAbortProcessSignal();
+ }
+ }
+ }
+ };
+ syncThread.start();
+ }
+
+ @Override
+ public void stop(boolean abort) {
+ if (!running.getAndSet(false)) {
+ return;
+ }
+
+ LOG.info("Stopping the WAL Procedure Store");
+ if (lock.tryLock()) {
+ try {
+ waitCond.signalAll();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ if (!abort) {
+ try {
+ syncThread.join();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ // Close the writer
+ closeStream();
+
+ // Close the old logs
+ // they should be already closed, this is just in case the load fails
+ // and we call start() and then stop()
+ for (ProcedureWALFile log: logs) {
+ log.close();
+ }
+ logs.clear();
+ }
+
+ @Override
+ public boolean isRunning() {
+ return running.get();
+ }
+
+ @Override
+ public int getNumThreads() {
+ return slots == null ? 0 : slots.length;
+ }
+
+ public ProcedureStoreTracker getStoreTracker() {
+ return storeTracker;
+ }
+
+ @Override
+ public void registerListener(ProcedureStoreListener listener) {
+ this.listeners.add(listener);
+ }
+
+ @Override
+ public boolean unregisterListener(ProcedureStoreListener listener) {
+ return this.listeners.remove(listener);
+ }
+
+ @Override
+ public void recoverLease() throws IOException {
+ LOG.info("Starting WAL Procedure Store lease recovery");
+ FileStatus[] oldLogs = getLogFiles();
+ while (running.get()) {
+ // Get Log-MaxID and recover lease on old logs
+ flushLogId = initOldLogs(oldLogs) + 1;
+
+ // Create new state-log
+ if (!rollWriter(flushLogId)) {
+ // someone else has already created this log
+ LOG.debug("someone else has already created log " + flushLogId);
+ continue;
+ }
+
+ // We have the lease on the log
+ oldLogs = getLogFiles();
+ if (getMaxLogId(oldLogs) > flushLogId) {
+ // Someone else created new logs
+ LOG.debug("someone else created new logs. expected maxLogId < " + flushLogId);
+ logs.getLast().removeFile();
+ continue;
+ }
+
+ LOG.info("lease acquired flushLogId=" + flushLogId);
+ break;
+ }
+ }
+
+ @Override
+ public Iterator<Procedure> load() throws IOException {
+ if (logs.isEmpty()) {
+ throw new RuntimeException("recoverLease() must be called before loading data");
+ }
+
+ // Nothing to do, If we have only the current log.
+ if (logs.size() == 1) {
+ LOG.debug("No state logs to replay");
+ return null;
+ }
+
+ // Load the old logs
+ final ArrayList<ProcedureWALFile> toRemove = new ArrayList<ProcedureWALFile>();
+ Iterator<ProcedureWALFile> it = logs.descendingIterator();
+ it.next(); // Skip the current log
+ try {
+ return ProcedureWALFormat.load(it, storeTracker, new ProcedureWALFormat.Loader() {
+ @Override
+ public void removeLog(ProcedureWALFile log) {
+ toRemove.add(log);
+ }
+
+ @Override
+ public void markCorruptedWAL(ProcedureWALFile log, IOException e) {
+ if (corruptedLogs == null) {
+ corruptedLogs = new HashSet<ProcedureWALFile>();
+ }
+ corruptedLogs.add(log);
+ // TODO: sideline corrupted log
+ }
+ });
+ } finally {
+ if (!toRemove.isEmpty()) {
+ for (ProcedureWALFile log: toRemove) {
+ removeLogFile(log);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void insert(final Procedure proc, final Procedure[] subprocs) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("insert " + proc + " subproc=" + Arrays.toString(subprocs));
+ }
+
+ ByteSlot slot = acquireSlot();
+ long logId = -1;
+ try {
+ // Serialize the insert
+ if (subprocs != null) {
+ ProcedureWALFormat.writeInsert(slot, proc, subprocs);
+ } else {
+ assert !proc.hasParent();
+ ProcedureWALFormat.writeInsert(slot, proc);
+ }
+
+ // Push the transaction data and wait until it is persisted
+ logId = pushData(slot);
+ } catch (IOException e) {
+ // We are not able to serialize the procedure.
+ // this is a code error, and we are not able to go on.
+ LOG.fatal("Unable to serialize one of the procedure: proc=" + proc +
+ " subprocs=" + Arrays.toString(subprocs), e);
+ throw new RuntimeException(e);
+ } finally {
+ releaseSlot(slot);
+ }
+
+ // Update the store tracker
+ synchronized (storeTracker) {
+ if (logId == flushLogId) {
+ storeTracker.insert(proc, subprocs);
+ }
+ }
+ }
+
+ @Override
+ public void update(final Procedure proc) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("update " + proc);
+ }
+
+ ByteSlot slot = acquireSlot();
+ long logId = -1;
+ try {
+ // Serialize the update
+ ProcedureWALFormat.writeUpdate(slot, proc);
+
+ // Push the transaction data and wait until it is persisted
+ logId = pushData(slot);
+ } catch (IOException e) {
+ // We are not able to serialize the procedure.
+ // this is a code error, and we are not able to go on.
+ LOG.fatal("Unable to serialize the procedure: " + proc, e);
+ throw new RuntimeException(e);
+ } finally {
+ releaseSlot(slot);
+ }
+
+ // Update the store tracker
+ boolean removeOldLogs = false;
+ synchronized (storeTracker) {
+ if (logId == flushLogId) {
+ storeTracker.update(proc);
+ removeOldLogs = storeTracker.isUpdated();
+ }
+ }
+
+ if (removeOldLogs) {
+ removeAllLogs(logId - 1);
+ }
+ }
+
+ @Override
+ public void delete(final long procId) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("delete " + procId);
+ }
+
+ ByteSlot slot = acquireSlot();
+ long logId = -1;
+ try {
+ // Serialize the delete
+ ProcedureWALFormat.writeDelete(slot, procId);
+
+ // Push the transaction data and wait until it is persisted
+ logId = pushData(slot);
+ } catch (IOException e) {
+ // We are not able to serialize the procedure.
+ // this is a code error, and we are not able to go on.
+ LOG.fatal("Unable to serialize the procedure: " + procId, e);
+ throw new RuntimeException(e);
+ } finally {
+ releaseSlot(slot);
+ }
+
+ boolean removeOldLogs = false;
+ synchronized (storeTracker) {
+ if (logId == flushLogId) {
+ storeTracker.delete(procId);
+ if (storeTracker.isEmpty()) {
+ removeOldLogs = rollWriterOrDie(logId + 1);
+ }
+ }
+ }
+
+ if (removeOldLogs) {
+ removeAllLogs(logId);
+ }
+ }
+
+ private ByteSlot acquireSlot() {
+ ByteSlot slot = slotsCache.poll();
+ return slot != null ? slot : new ByteSlot();
+ }
+
+ private void releaseSlot(final ByteSlot slot) {
+ slot.reset();
+ slotsCache.offer(slot);
+ }
+
+ private long pushData(final ByteSlot slot) {
+ assert !logs.isEmpty() : "recoverLease() must be called before inserting data";
+ long logId = -1;
+
+ lock.lock();
+ try {
+ // Wait for the sync to be completed
+ while (true) {
+ if (inSync.get()) {
+ syncCond.await();
+ } else if (slotIndex == slots.length) {
+ slotCond.signal();
+ syncCond.await();
+ } else {
+ break;
+ }
+ }
+
+ slots[slotIndex++] = slot;
+ logId = flushLogId;
+
+ // Notify that there is new data
+ if (slotIndex == 1) {
+ waitCond.signal();
+ }
+
+ // Notify that the slots are full
+ if (slotIndex == slots.length) {
+ slotCond.signal();
+ }
+ syncCond.await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ sendAbortProcessSignal();
+ } finally {
+ lock.unlock();
+ }
+ return logId;
+ }
+
+ private void syncLoop() throws IOException {
+ inSync.set(false);
+ while (running.get()) {
+ lock.lock();
+ try {
+ // Wait until new data is available
+ if (slotIndex == 0) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Waiting for data. flushed=" + StringUtils.humanSize(totalSynced));
+ }
+ waitCond.await();
+ if (slotIndex == 0) {
+ // no data.. probably a stop()
+ continue;
+ }
+ }
+
+ // Wait SYNC_WAIT_MSEC or the signal of "slots full" before flushing
+ slotCond.await(syncWaitMsec, TimeUnit.MILLISECONDS);
+
+ inSync.set(true);
+ totalSynced += syncSlots();
+ slotIndex = 0;
+ inSync.set(false);
+ syncCond.signalAll();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ sendAbortProcessSignal();
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+
+ private long syncSlots() {
+ int retry = 0;
+ long totalSynced = 0;
+ do {
+ try {
+ totalSynced = syncSlots(stream, slots, 0, slotIndex);
+ break;
+ } catch (Throwable e) {
+ if (++retry == MAX_RETRIES_BEFORE_ABORT) {
+ LOG.error("sync slot failed, abort.", e);
+ sendAbortProcessSignal();
+ }
+ }
+ } while (running.get());
+ return totalSynced;
+ }
+
+ protected long syncSlots(FSDataOutputStream stream, ByteSlot[] slots, int offset, int count)
+ throws IOException {
+ long totalSynced = 0;
+ for (int i = 0; i < count; ++i) {
+ ByteSlot data = slots[offset + i];
+ data.writeTo(stream);
+ totalSynced += data.size();
+ }
+ stream.hsync();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Sync slots=" + count + '/' + slots.length +
+ " flushed=" + StringUtils.humanSize(totalSynced));
+ }
+ return totalSynced;
+ }
+
+ private void sendAbortProcessSignal() {
+ if (!this.listeners.isEmpty()) {
+ for (ProcedureStoreListener listener : this.listeners) {
+ listener.abortProcess();
+ }
+ }
+ }
+
+ private boolean rollWriterOrDie(final long logId) {
+ try {
+ return rollWriter(logId);
+ } catch (IOException e) {
+ LOG.warn("Unable to roll the log", e);
+ sendAbortProcessSignal();
+ return false;
+ }
+ }
+
+ private boolean rollWriter(final long logId) throws IOException {
+ ProcedureWALHeader header = ProcedureWALHeader.newBuilder()
+ .setVersion(ProcedureWALFormat.HEADER_VERSION)
+ .setType(ProcedureWALFormat.LOG_TYPE_STREAM)
+ .setMinProcId(storeTracker.getMinProcId())
+ .setLogId(logId)
+ .build();
+
+ FSDataOutputStream newStream = null;
+ Path newLogFile = null;
+ long startPos = -1;
+ try {
+ newLogFile = getLogFilePath(logId);
+ newStream = fs.create(newLogFile, false);
+ ProcedureWALFormat.writeHeader(newStream, header);
+ startPos = newStream.getPos();
+ } catch (FileAlreadyExistsException e) {
+ LOG.error("Log file with id=" + logId + " already exists", e);
+ return false;
+ }
+ lock.lock();
+ try {
+ closeStream();
+ synchronized (storeTracker) {
+ storeTracker.resetUpdates();
+ }
+ stream = newStream;
+ flushLogId = logId;
+ totalSynced = 0;
+ logs.add(new ProcedureWALFile(fs, newLogFile, header, startPos));
+ } finally {
+ lock.unlock();
+ }
+ LOG.info("Roll new state log: " + logId);
+ return true;
+ }
+
+ private void closeStream() {
+ try {
+ if (stream != null) {
+ try {
+ ProcedureWALFormat.writeTrailer(stream, storeTracker);
+ } catch (IOException e) {
+ LOG.warn("Unable to write the trailer: " + e.getMessage());
+ }
+ stream.close();
+ }
+ } catch (IOException e) {
+ LOG.error("Unable to close the stream", e);
+ } finally {
+ stream = null;
+ }
+ }
+
+ private void removeAllLogs(long lastLogId) {
+ LOG.info("Remove all state logs with ID less then " + lastLogId);
+ while (!logs.isEmpty()) {
+ ProcedureWALFile log = logs.getFirst();
+ if (lastLogId < log.getLogId()) {
+ break;
+ }
+
+ removeLogFile(log);
+ }
+ }
+
+ private boolean removeLogFile(final ProcedureWALFile log) {
+ try {
+ LOG.debug("remove log: " + log);
+ log.removeFile();
+ logs.remove(log);
+ } catch (IOException e) {
+ LOG.error("unable to remove log " + log, e);
+ return false;
+ }
+ return true;
+ }
+
+ public Set<ProcedureWALFile> getCorruptedLogs() {
+ return corruptedLogs;
+ }
+
+ // ==========================================================================
+ // FileSystem Log Files helpers
+ // ==========================================================================
+ public Path getLogDir() {
+ return this.logDir;
+ }
+
+ public FileSystem getFileSystem() {
+ return this.fs;
+ }
+
+ protected Path getLogFilePath(final long logId) throws IOException {
+ return new Path(logDir, String.format("state-%020d.log", logId));
+ }
+
+ private static long getLogIdFromName(final String name) {
+ int end = name.lastIndexOf(".log");
+ int start = name.lastIndexOf('-') + 1;
+ while (start < end) {
+ if (name.charAt(start) != '0')
+ break;
+ start++;
+ }
+ return Long.parseLong(name.substring(start, end));
+ }
+
+ private FileStatus[] getLogFiles() throws IOException {
+ try {
+ return fs.listStatus(logDir, new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ String name = path.getName();
+ return name.startsWith("state-") && name.endsWith(".log");
+ }
+ });
+ } catch (FileNotFoundException e) {
+ LOG.warn("log directory not found: " + e.getMessage());
+ return null;
+ }
+ }
+
+ private long getMaxLogId(final FileStatus[] logFiles) {
+ long maxLogId = 0;
+ if (logFiles != null && logFiles.length > 0) {
+ for (int i = 0; i < logFiles.length; ++i) {
+ maxLogId = Math.max(maxLogId, getLogIdFromName(logFiles[i].getPath().getName()));
+ }
+ }
+ return maxLogId;
+ }
+
+ /**
+ * @return Max-LogID of the specified log file set
+ */
+ private long initOldLogs(final FileStatus[] logFiles) throws IOException {
+ this.logs.clear();
+
+ long maxLogId = 0;
+ if (logFiles != null && logFiles.length > 0) {
+ for (int i = 0; i < logFiles.length; ++i) {
+ final Path logPath = logFiles[i].getPath();
+ leaseRecovery.recoverFileLease(fs, logPath);
+ maxLogId = Math.max(maxLogId, getLogIdFromName(logPath.getName()));
+
+ ProcedureWALFile log = initOldLog(logFiles[i]);
+ if (log != null) {
+ this.logs.add(log);
+ }
+ }
+ Collections.sort(this.logs);
+ initTrackerFromOldLogs();
+ }
+ return maxLogId;
+ }
+
+ private void initTrackerFromOldLogs() {
+ // TODO: Load the most recent tracker available
+ if (!logs.isEmpty()) {
+ ProcedureWALFile log = logs.getLast();
+ try {
+ log.readTracker(storeTracker);
+ } catch (IOException e) {
+ LOG.error("Unable to read tracker for " + log, e);
+ // try the next one...
+ storeTracker.clear();
+ storeTracker.setPartialFlag(true);
+ }
+ }
+ }
+
+ private ProcedureWALFile initOldLog(final FileStatus logFile) throws IOException {
+ ProcedureWALFile log = new ProcedureWALFile(fs, logFile);
+ if (logFile.getLen() == 0) {
+ LOG.warn("Remove uninitialized log " + logFile);
+ log.removeFile();
+ return null;
+ }
+
+ LOG.debug("opening state-log: " + logFile);
+ try {
+ log.open();
+ } catch (ProcedureWALFormat.InvalidWALDataException e) {
+ LOG.warn("Remove uninitialized log " + logFile, e);
+ log.removeFile();
+ return null;
+ } catch (IOException e) {
+ String msg = "Unable to read state log: " + logFile;
+ LOG.error(msg, e);
+ throw new IOException(msg, e);
+ }
+
+ if (log.isCompacted()) {
+ try {
+ log.readTrailer();
+ } catch (IOException e) {
+ // unfinished compacted log throw it away
+ LOG.warn("Unfinished compacted log " + logFile, e);
+ log.removeFile();
+ return null;
+ }
+ }
+ return log;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/04246c6c/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/ByteSlot.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/ByteSlot.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/ByteSlot.java
new file mode 100644
index 0000000..8904116
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/ByteSlot.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2.util;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * Similar to the ByteArrayOutputStream, with the exception that we can prepend an header.
+ * e.g. you write some data and you want to prepend an header that contains the data len or cksum.
+ * <code>
+ * ByteSlot slot = new ByteSlot();
+ * // write data
+ * slot.write(...);
+ * slot.write(...);
+ * // write header with the size of the written data
+ * slot.markHead();
+ * slot.write(Bytes.toBytes(slot.size()));
+ * // flush to stream as [header, data]
+ * slot.writeTo(stream);
+ * </code>
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class ByteSlot extends OutputStream {
+ private static final int DOUBLE_GROW_LIMIT = 1 << 20;
+ private static final int GROW_ALIGN = 128;
+
+ private byte[] buf;
+ private int head;
+ private int size;
+
+ public void reset() {
+ head = 0;
+ size = 0;
+ }
+
+ public void markHead() {
+ head = size;
+ }
+
+ public int getHead() {
+ return head;
+ }
+
+ public int size() {
+ return size;
+ }
+
+ public byte[] getBuffer() {
+ return buf;
+ }
+
+ public void writeAt(int offset, int b) {
+ head = Math.min(head, offset);
+ buf[offset] = (byte)b;
+ }
+
+ public void write(int b) {
+ ensureCapacity(size + 1);
+ buf[size++] = (byte)b;
+ }
+
+ public void write(byte[] b, int off, int len) {
+ ensureCapacity(size + len);
+ System.arraycopy(b, off, buf, size, len);
+ size += len;
+ }
+
+ public void writeTo(final OutputStream stream) throws IOException {
+ if (head != 0) {
+ stream.write(buf, head, size - head);
+ stream.write(buf, 0, head);
+ } else {
+ stream.write(buf, 0, size);
+ }
+ }
+
+ private void ensureCapacity(int minCapacity) {
+ minCapacity = (minCapacity + (GROW_ALIGN - 1)) & -GROW_ALIGN;
+ if (buf == null) {
+ buf = new byte[minCapacity];
+ } else if (minCapacity > buf.length) {
+ int newCapacity = buf.length << 1;
+ if (minCapacity > newCapacity || newCapacity > DOUBLE_GROW_LIMIT) {
+ newCapacity = minCapacity;
+ }
+ buf = Arrays.copyOf(buf, newCapacity);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/04246c6c/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/StringUtils.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/StringUtils.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/StringUtils.java
new file mode 100644
index 0000000..97134c2
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/StringUtils.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2.util;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public final class StringUtils {
+ private StringUtils() {}
+
+ public static String humanTimeDiff(long timeDiff) {
+ StringBuilder buf = new StringBuilder();
+ long hours = timeDiff / (60*60*1000);
+ long rem = (timeDiff % (60*60*1000));
+ long minutes = rem / (60*1000);
+ rem = rem % (60*1000);
+ float seconds = rem / 1000.0f;
+
+ if (hours != 0){
+ buf.append(hours);
+ buf.append("hrs, ");
+ }
+ if (minutes != 0){
+ buf.append(minutes);
+ buf.append("mins, ");
+ }
+ if (hours > 0 || minutes > 0) {
+ buf.append(seconds);
+ buf.append("sec");
+ } else {
+ buf.append(String.format("%.4fsec", seconds));
+ }
+ return buf.toString();
+ }
+
+ public static String humanSize(double size) {
+ if (size >= (1L << 40)) return String.format("%.1fT", size / (1L << 40));
+ if (size >= (1L << 30)) return String.format("%.1fG", size / (1L << 30));
+ if (size >= (1L << 20)) return String.format("%.1fM", size / (1L << 20));
+ if (size >= (1L << 10)) return String.format("%.1fK", size / (1L << 10));
+ return String.format("%.0f", size);
+ }
+
+ public static boolean isEmpty(final String input) {
+ return input == null || input.length() == 0;
+ }
+
+ public static String buildString(final String... parts) {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < parts.length; ++i) {
+ sb.append(parts[i]);
+ }
+ return sb.toString();
+ }
+
+ public static StringBuilder appendStrings(final StringBuilder sb, final String... parts) {
+ for (int i = 0; i < parts.length; ++i) {
+ sb.append(parts[i]);
+ }
+ return sb;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/04246c6c/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/TimeoutBlockingQueue.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/TimeoutBlockingQueue.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/TimeoutBlockingQueue.java
new file mode 100644
index 0000000..f710ef4
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/TimeoutBlockingQueue.java
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2.util;
+
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class TimeoutBlockingQueue<E> {
+ public static interface TimeoutRetriever<T> {
+ long getTimeout(T object);
+ TimeUnit getTimeUnit(T object);
+ }
+
+ private final ReentrantLock lock = new ReentrantLock();
+ private final Condition waitCond = lock.newCondition();
+ private final TimeoutRetriever<? super E> timeoutRetriever;
+
+ private E[] objects;
+ private int head = 0;
+ private int tail = 0;
+
+ public TimeoutBlockingQueue(TimeoutRetriever<? super E> timeoutRetriever) {
+ this(32, timeoutRetriever);
+ }
+
+ @SuppressWarnings("unchecked")
+ public TimeoutBlockingQueue(int capacity, TimeoutRetriever<? super E> timeoutRetriever) {
+ this.objects = (E[])new Object[capacity];
+ this.timeoutRetriever = timeoutRetriever;
+ }
+
+ public void dump() {
+ for (int i = 0; i < objects.length; ++i) {
+ if (i == head) {
+ System.out.print("[" + objects[i] + "] ");
+ } else if (i == tail) {
+ System.out.print("]" + objects[i] + "[ ");
+ } else {
+ System.out.print(objects[i] + " ");
+ }
+ }
+ System.out.println();
+ }
+
+ public void clear() {
+ lock.lock();
+ try {
+ if (head != tail) {
+ for (int i = head; i < tail; ++i) {
+ objects[i] = null;
+ }
+ head = 0;
+ tail = 0;
+ waitCond.signal();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void add(E e) {
+ if (e == null) throw new NullPointerException();
+
+ lock.lock();
+ try {
+ addElement(e);
+ waitCond.signal();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
+ public E poll() {
+ lock.lock();
+ try {
+ if (isEmpty()) {
+ waitCond.await();
+ return null;
+ }
+
+ E elem = objects[head];
+ long nanos = getNanosTimeout(elem);
+ nanos = waitCond.awaitNanos(nanos);
+ return nanos > 0 ? null : removeFirst();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return null;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public int size() {
+ return tail - head;
+ }
+
+ public boolean isEmpty() {
+ return (tail - head) == 0;
+ }
+
+ public void signalAll() {
+ lock.lock();
+ try {
+ waitCond.signalAll();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private void addElement(E elem) {
+ int size = (tail - head);
+ if ((objects.length - size) == 0) {
+ int capacity = size + ((size < 64) ? (size + 2) : (size >> 1));
+ E[] newObjects = (E[])new Object[capacity];
+
+ if (compareTimeouts(objects[tail - 1], elem) <= 0) {
+ // Append
+ System.arraycopy(objects, head, newObjects, 0, tail);
+ tail -= head;
+ newObjects[tail++] = elem;
+ } else if (compareTimeouts(objects[head], elem) > 0) {
+ // Prepend
+ System.arraycopy(objects, head, newObjects, 1, tail);
+ newObjects[0] = elem;
+ tail -= (head - 1);
+ } else {
+ // Insert in the middle
+ int index = upperBound(head, tail - 1, elem);
+ int newIndex = (index - head);
+ System.arraycopy(objects, head, newObjects, 0, newIndex);
+ newObjects[newIndex] = elem;
+ System.arraycopy(objects, index, newObjects, newIndex + 1, tail - index);
+ tail -= (head - 1);
+ }
+ head = 0;
+ objects = newObjects;
+ } else {
+ if (tail == objects.length) {
+ // shift down |-----AAAAAAA|
+ tail -= head;
+ System.arraycopy(objects, head, objects, 0, tail);
+ head = 0;
+ }
+
+ if (tail == head || compareTimeouts(objects[tail - 1], elem) <= 0) {
+ // Append
+ objects[tail++] = elem;
+ } else if (head > 0 && compareTimeouts(objects[head], elem) > 0) {
+ // Prepend
+ objects[--head] = elem;
+ } else {
+ // Insert in the middle
+ int index = upperBound(head, tail - 1, elem);
+ System.arraycopy(objects, index, objects, index + 1, tail - index);
+ objects[index] = elem;
+ tail++;
+ }
+ }
+ }
+
+ private E removeFirst() {
+ E elem = objects[head];
+ objects[head] = null;
+ head = (head + 1) % objects.length;
+ if (head == 0) tail = 0;
+ return elem;
+ }
+
+ private int upperBound(int start, int end, E key) {
+ while (start < end) {
+ int mid = (start + end) >>> 1;
+ E mitem = objects[mid];
+ int cmp = compareTimeouts(mitem, key);
+ if (cmp > 0) {
+ end = mid;
+ } else {
+ start = mid + 1;
+ }
+ }
+ return start;
+ }
+
+ private int compareTimeouts(final E a, final E b) {
+ long t1 = getNanosTimeout(a);
+ long t2 = getNanosTimeout(b);
+ return (t1 < t2) ? -1 : (t1 > t2) ? 1 : 0;
+ }
+
+ private long getNanosTimeout(final E obj) {
+ TimeUnit unit = timeoutRetriever.getTimeUnit(obj);
+ long timeout = timeoutRetriever.getTimeout(obj);
+ return unit.toNanos(timeout);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/04246c6c/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
new file mode 100644
index 0000000..6e7306c
--- /dev/null
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
+import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class ProcedureTestingUtility {
+ private static final Log LOG = LogFactory.getLog(ProcedureTestingUtility.class);
+
+ private ProcedureTestingUtility() {
+ }
+
+ public static ProcedureStore createStore(final Configuration conf, final FileSystem fs,
+ final Path baseDir) throws IOException {
+ return createWalStore(conf, fs, baseDir);
+ }
+
+ public static WALProcedureStore createWalStore(final Configuration conf, final FileSystem fs,
+ final Path logDir) throws IOException {
+ return new WALProcedureStore(conf, fs, logDir, new WALProcedureStore.LeaseRecovery() {
+ @Override
+ public void recoverFileLease(FileSystem fs, Path path) throws IOException {
+ // no-op
+ }
+ });
+ }
+
+ public static <TEnv> void restart(ProcedureExecutor<TEnv> procExecutor)
+ throws Exception {
+ restart(procExecutor, null);
+ }
+
+ public static <TEnv> void restart(ProcedureExecutor<TEnv> procExecutor,
+ Runnable beforeStartAction) throws Exception {
+ ProcedureStore procStore = procExecutor.getStore();
+ int storeThreads = procExecutor.getNumThreads();
+ int execThreads = procExecutor.getNumThreads();
+ // stop
+ procExecutor.stop();
+ procStore.stop(false);
+ procExecutor.join();
+ // nothing running...
+ if (beforeStartAction != null) {
+ beforeStartAction.run();
+ }
+ // re-start
+ procStore.start(storeThreads);
+ procExecutor.start(execThreads);
+ }
+
+ public static <TEnv> void setKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor,
+ boolean value) {
+ if (procExecutor.testing == null) {
+ procExecutor.testing = new ProcedureExecutor.Testing();
+ }
+ procExecutor.testing.killBeforeStoreUpdate = value;
+ LOG.warn("Set Kill before store update to: " + procExecutor.testing.killBeforeStoreUpdate);
+ }
+
+ public static <TEnv> void setToggleKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor,
+ boolean value) {
+ if (procExecutor.testing == null) {
+ procExecutor.testing = new ProcedureExecutor.Testing();
+ }
+ procExecutor.testing.toggleKillBeforeStoreUpdate = value;
+ }
+
+ public static <TEnv> void toggleKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor) {
+ if (procExecutor.testing == null) {
+ procExecutor.testing = new ProcedureExecutor.Testing();
+ }
+ procExecutor.testing.killBeforeStoreUpdate = !procExecutor.testing.killBeforeStoreUpdate;
+ LOG.warn("Set Kill before store update to: " + procExecutor.testing.killBeforeStoreUpdate);
+ }
+
+ public static <TEnv> void setKillAndToggleBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor,
+ boolean value) {
+ ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, value);
+ ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, value);
+ }
+
+ public static <TEnv> long submitAndWait(ProcedureExecutor<TEnv> procExecutor, Procedure proc) {
+ long procId = procExecutor.submitProcedure(proc);
+ waitProcedure(procExecutor, procId);
+ return procId;
+ }
+
+ public static <TEnv> void waitProcedure(ProcedureExecutor<TEnv> procExecutor, long procId) {
+ while (!procExecutor.isFinished(procId) && procExecutor.isRunning()) {
+ Threads.sleepWithoutInterrupt(250);
+ }
+ }
+
+ public static <TEnv> void waitNoProcedureRunning(ProcedureExecutor<TEnv> procExecutor) {
+ int stableRuns = 0;
+ while (stableRuns < 10) {
+ if (procExecutor.getActiveExecutorCount() > 0 || procExecutor.getRunnableSet().size() > 0) {
+ stableRuns = 0;
+ Threads.sleepWithoutInterrupt(100);
+ } else {
+ stableRuns++;
+ Threads.sleepWithoutInterrupt(25);
+ }
+ }
+ }
+
+ public static <TEnv> void assertProcNotYetCompleted(ProcedureExecutor<TEnv> procExecutor,
+ long procId) {
+ assertFalse("expected a running proc", procExecutor.isFinished(procId));
+ assertEquals(null, procExecutor.getResult(procId));
+ }
+
+ public static <TEnv> void assertProcNotFailed(ProcedureExecutor<TEnv> procExecutor,
+ long procId) {
+ ProcedureResult result = procExecutor.getResult(procId);
+ assertTrue("expected procedure result", result != null);
+ assertProcNotFailed(result);
+ }
+
+ public static void assertProcNotFailed(final ProcedureResult result) {
+ Exception exception = result.getException();
+ String msg = exception != null ? exception.toString() : "no exception found";
+ assertFalse(msg, result.isFailed());
+ }
+
+ public static void assertIsAbortException(final ProcedureResult result) {
+ LOG.info(result.getException());
+ assertEquals(true, result.isFailed());
+ Throwable cause = result.getException().getCause();
+ assertTrue("expected abort exception, got "+ cause,
+ cause instanceof ProcedureAbortedException);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/04246c6c/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java
new file mode 100644
index 0000000..7fe109e
--- /dev/null
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java
@@ -0,0 +1,338 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
+import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureState;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@Category({MasterTests.class, SmallTests.class})
+public class TestProcedureExecution {
+ private static final Log LOG = LogFactory.getLog(TestProcedureExecution.class);
+
+ private static final int PROCEDURE_EXECUTOR_SLOTS = 1;
+ private static final Procedure NULL_PROC = null;
+
+ private ProcedureExecutor<Void> procExecutor;
+ private ProcedureStore procStore;
+
+ private HBaseCommonTestingUtility htu;
+ private FileSystem fs;
+ private Path testDir;
+ private Path logDir;
+
+ @Before
+ public void setUp() throws IOException {
+ htu = new HBaseCommonTestingUtility();
+ testDir = htu.getDataTestDir();
+ fs = testDir.getFileSystem(htu.getConfiguration());
+ assertTrue(testDir.depth() > 1);
+
+ logDir = new Path(testDir, "proc-logs");
+ procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir);
+ procExecutor = new ProcedureExecutor(htu.getConfiguration(), null, procStore);
+ procStore.start(PROCEDURE_EXECUTOR_SLOTS);
+ procExecutor.start(PROCEDURE_EXECUTOR_SLOTS);
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ procExecutor.stop();
+ procStore.stop(false);
+ fs.delete(logDir, true);
+ }
+
+ private static class TestProcedureException extends Exception {
+ public TestProcedureException(String msg) { super(msg); }
+ }
+
+ public static class TestSequentialProcedure extends SequentialProcedure<Void> {
+ private final Procedure[] subProcs;
+ private final List<String> state;
+ private final Exception failure;
+ private final String name;
+
+ public TestSequentialProcedure() {
+ throw new UnsupportedOperationException("recovery should not be triggered here");
+ }
+
+ public TestSequentialProcedure(String name, List<String> state, Procedure... subProcs) {
+ this.state = state;
+ this.subProcs = subProcs;
+ this.name = name;
+ this.failure = null;
+ }
+
+ public TestSequentialProcedure(String name, List<String> state, Exception failure) {
+ this.state = state;
+ this.subProcs = null;
+ this.name = name;
+ this.failure = failure;
+ }
+
+ @Override
+ protected Procedure[] execute(Void env) {
+ state.add(name + "-execute");
+ if (failure != null) {
+ setFailure(new RemoteProcedureException(name + "-failure", failure));
+ return null;
+ }
+ return subProcs;
+ }
+
+ @Override
+ protected void rollback(Void env) {
+ state.add(name + "-rollback");
+ }
+
+ @Override
+ protected boolean abort(Void env) {
+ state.add(name + "-abort");
+ return true;
+ }
+ }
+
+ @Test(timeout=30000)
+ public void testBadSubprocList() {
+ List<String> state = new ArrayList<String>();
+ Procedure subProc2 = new TestSequentialProcedure("subProc2", state);
+ Procedure subProc1 = new TestSequentialProcedure("subProc1", state, subProc2, NULL_PROC);
+ Procedure rootProc = new TestSequentialProcedure("rootProc", state, subProc1);
+ long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, rootProc);
+
+ // subProc1 has a "null" subprocedure which is catched as InvalidArgument
+ // failed state with 2 execute and 2 rollback
+ LOG.info(state);
+ ProcedureResult result = procExecutor.getResult(rootId);
+ LOG.info(result.getException());
+ assertTrue(state.toString(), result.isFailed());
+ assertTrue(result.getException().toString(),
+ result.getException().getCause() instanceof IllegalArgumentException);
+
+ assertEquals(state.toString(), 4, state.size());
+ assertEquals("rootProc-execute", state.get(0));
+ assertEquals("subProc1-execute", state.get(1));
+ assertEquals("subProc1-rollback", state.get(2));
+ assertEquals("rootProc-rollback", state.get(3));
+ }
+
+ @Test(timeout=30000)
+ public void testSingleSequentialProc() {
+ List<String> state = new ArrayList<String>();
+ Procedure subProc2 = new TestSequentialProcedure("subProc2", state);
+ Procedure subProc1 = new TestSequentialProcedure("subProc1", state, subProc2);
+ Procedure rootProc = new TestSequentialProcedure("rootProc", state, subProc1);
+ long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, rootProc);
+
+ // successful state, with 3 execute
+ LOG.info(state);
+ ProcedureResult result = procExecutor.getResult(rootId);
+ ProcedureTestingUtility.assertProcNotFailed(result);
+ assertEquals(state.toString(), 3, state.size());
+ }
+
+ @Test(timeout=30000)
+ public void testSingleSequentialProcRollback() {
+ List<String> state = new ArrayList<String>();
+ Procedure subProc2 = new TestSequentialProcedure("subProc2", state,
+ new TestProcedureException("fail test"));
+ Procedure subProc1 = new TestSequentialProcedure("subProc1", state, subProc2);
+ Procedure rootProc = new TestSequentialProcedure("rootProc", state, subProc1);
+ long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, rootProc);
+
+ // the 3rd proc fail, rollback after 2 successful execution
+ LOG.info(state);
+ ProcedureResult result = procExecutor.getResult(rootId);
+ LOG.info(result.getException());
+ assertTrue(state.toString(), result.isFailed());
+ assertTrue(result.getException().toString(),
+ result.getException().getCause() instanceof TestProcedureException);
+
+ assertEquals(state.toString(), 6, state.size());
+ assertEquals("rootProc-execute", state.get(0));
+ assertEquals("subProc1-execute", state.get(1));
+ assertEquals("subProc2-execute", state.get(2));
+ assertEquals("subProc2-rollback", state.get(3));
+ assertEquals("subProc1-rollback", state.get(4));
+ assertEquals("rootProc-rollback", state.get(5));
+ }
+
+ public static class TestFaultyRollback extends SequentialProcedure<Void> {
+ private int retries = 0;
+
+ public TestFaultyRollback() { }
+
+ @Override
+ protected Procedure[] execute(Void env) {
+ setFailure("faulty-rollback-test", new TestProcedureException("test faulty rollback"));
+ return null;
+ }
+
+ @Override
+ protected void rollback(Void env) throws IOException {
+ if (++retries < 3) {
+ LOG.info("inject rollback failure " + retries);
+ throw new IOException("injected failure number " + retries);
+ }
+ LOG.info("execute non faulty rollback step retries=" + retries);
+ }
+
+ @Override
+ protected boolean abort(Void env) { return false; }
+ }
+
+ @Test(timeout=30000)
+ public void testRollbackRetriableFailure() {
+ long procId = ProcedureTestingUtility.submitAndWait(procExecutor, new TestFaultyRollback());
+
+ ProcedureResult result = procExecutor.getResult(procId);
+ LOG.info(result.getException());
+ assertTrue("expected a failure", result.isFailed());
+ assertTrue(result.getException().toString(),
+ result.getException().getCause() instanceof TestProcedureException);
+ }
+
+ public static class TestWaitingProcedure extends SequentialProcedure<Void> {
+ private final List<String> state;
+ private final boolean hasChild;
+ private final String name;
+
+ public TestWaitingProcedure() {
+ throw new UnsupportedOperationException("recovery should not be triggered here");
+ }
+
+ public TestWaitingProcedure(String name, List<String> state, boolean hasChild) {
+ this.hasChild = hasChild;
+ this.state = state;
+ this.name = name;
+ }
+
+ @Override
+ protected Procedure[] execute(Void env) {
+ state.add(name + "-execute");
+ setState(ProcedureState.WAITING_TIMEOUT);
+ return hasChild ? new Procedure[] { new TestWaitChild(name, state) } : null;
+ }
+
+ @Override
+ protected void rollback(Void env) {
+ state.add(name + "-rollback");
+ }
+
+ @Override
+ protected boolean abort(Void env) {
+ state.add(name + "-abort");
+ return true;
+ }
+
+ public static class TestWaitChild extends SequentialProcedure<Void> {
+ private final List<String> state;
+ private final String name;
+
+ public TestWaitChild() {
+ throw new UnsupportedOperationException("recovery should not be triggered here");
+ }
+
+ public TestWaitChild(String name, List<String> state) {
+ this.name = name;
+ this.state = state;
+ }
+
+ @Override
+ protected Procedure[] execute(Void env) {
+ state.add(name + "-child-execute");
+ return null;
+ }
+
+ @Override
+ protected void rollback(Void env) {
+ state.add(name + "-child-rollback");
+ }
+
+ @Override
+ protected boolean abort(Void env) {
+ state.add(name + "-child-abort");
+ return true;
+ }
+ }
+ }
+
+ @Test(timeout=30000)
+ public void testAbortTimeout() {
+ final int PROC_TIMEOUT_MSEC = 2500;
+ List<String> state = new ArrayList<String>();
+ Procedure proc = new TestWaitingProcedure("wproc", state, false);
+ proc.setTimeout(PROC_TIMEOUT_MSEC);
+ long startTime = EnvironmentEdgeManager.currentTime();
+ long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
+ long execTime = EnvironmentEdgeManager.currentTime() - startTime;
+ LOG.info(state);
+ assertTrue("we didn't wait enough execTime=" + execTime, execTime >= PROC_TIMEOUT_MSEC);
+ ProcedureResult result = procExecutor.getResult(rootId);
+ LOG.info(result.getException());
+ assertTrue(state.toString(), result.isFailed());
+ assertTrue(result.getException().toString(),
+ result.getException().getCause() instanceof TimeoutException);
+ assertEquals(state.toString(), 2, state.size());
+ assertEquals("wproc-execute", state.get(0));
+ assertEquals("wproc-rollback", state.get(1));
+ }
+
+ @Test(timeout=30000)
+ public void testAbortTimeoutWithChildren() {
+ List<String> state = new ArrayList<String>();
+ Procedure proc = new TestWaitingProcedure("wproc", state, true);
+ proc.setTimeout(2500);
+ long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
+ LOG.info(state);
+ ProcedureResult result = procExecutor.getResult(rootId);
+ LOG.info(result.getException());
+ assertTrue(state.toString(), result.isFailed());
+ assertTrue(result.getException().toString(),
+ result.getException().getCause() instanceof TimeoutException);
+ assertEquals(state.toString(), 4, state.size());
+ assertEquals("wproc-execute", state.get(0));
+ assertEquals("wproc-child-execute", state.get(1));
+ assertEquals("wproc-child-rollback", state.get(2));
+ assertEquals("wproc-rollback", state.get(3));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/04246c6c/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureFairRunQueues.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureFairRunQueues.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureFairRunQueues.java
new file mode 100644
index 0000000..e36a295
--- /dev/null
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureFairRunQueues.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2;
+
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+
+@Category({MasterTests.class, SmallTests.class})
+public class TestProcedureFairRunQueues {
+ private static class TestRunQueue implements ProcedureFairRunQueues.FairObject {
+ private final int priority;
+ private final String name;
+
+ private boolean available = true;
+
+ public TestRunQueue(String name, int priority) {
+ this.name = name;
+ this.priority = priority;
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+
+ private void setAvailable(boolean available) {
+ this.available = available;
+ }
+
+ @Override
+ public boolean isAvailable() {
+ return available;
+ }
+
+ @Override
+ public int getPriority() {
+ return priority;
+ }
+ }
+
+ @Test
+ public void testEmptyFairQueues() throws Exception {
+ ProcedureFairRunQueues<String, TestRunQueue> fairq
+ = new ProcedureFairRunQueues<String, TestRunQueue>(1);
+ for (int i = 0; i < 3; ++i) {
+ assertEquals(null, fairq.poll());
+ }
+ }
+
+ @Test
+ public void testFairQueues() throws Exception {
+ ProcedureFairRunQueues<String, TestRunQueue> fairq
+ = new ProcedureFairRunQueues<String, TestRunQueue>(1);
+ TestRunQueue a = fairq.add("A", new TestRunQueue("A", 1));
+ TestRunQueue b = fairq.add("B", new TestRunQueue("B", 1));
+ TestRunQueue m = fairq.add("M", new TestRunQueue("M", 2));
+
+ for (int i = 0; i < 3; ++i) {
+ assertEquals(a, fairq.poll());
+ assertEquals(b, fairq.poll());
+ assertEquals(m, fairq.poll());
+ assertEquals(m, fairq.poll());
+ }
+ }
+
+ @Test
+ public void testFairQueuesNotAvailable() throws Exception {
+ ProcedureFairRunQueues<String, TestRunQueue> fairq
+ = new ProcedureFairRunQueues<String, TestRunQueue>(1);
+ TestRunQueue a = fairq.add("A", new TestRunQueue("A", 1));
+ TestRunQueue b = fairq.add("B", new TestRunQueue("B", 1));
+ TestRunQueue m = fairq.add("M", new TestRunQueue("M", 2));
+
+ // m is not available
+ m.setAvailable(false);
+ for (int i = 0; i < 3; ++i) {
+ assertEquals(a, fairq.poll());
+ assertEquals(b, fairq.poll());
+ }
+
+ // m is available
+ m.setAvailable(true);
+ for (int i = 0; i < 3; ++i) {
+ assertEquals(m, fairq.poll());
+ assertEquals(m, fairq.poll());
+ assertEquals(a, fairq.poll());
+ assertEquals(b, fairq.poll());
+ }
+
+ // b is not available
+ b.setAvailable(false);
+ for (int i = 0; i < 3; ++i) {
+ assertEquals(m, fairq.poll());
+ assertEquals(m, fairq.poll());
+ assertEquals(a, fairq.poll());
+ }
+
+ assertEquals(m, fairq.poll());
+ m.setAvailable(false);
+ // m should be fetched next, but is no longer available
+ assertEquals(a, fairq.poll());
+ assertEquals(a, fairq.poll());
+ b.setAvailable(true);
+ for (int i = 0; i < 3; ++i) {
+ assertEquals(b, fairq.poll());
+ assertEquals(a, fairq.poll());
+ }
+ }
+
+ @Test
+ public void testFairQueuesDelete() throws Exception {
+ ProcedureFairRunQueues<String, TestRunQueue> fairq
+ = new ProcedureFairRunQueues<String, TestRunQueue>(1);
+ TestRunQueue a = fairq.add("A", new TestRunQueue("A", 1));
+ TestRunQueue b = fairq.add("B", new TestRunQueue("B", 1));
+ TestRunQueue m = fairq.add("M", new TestRunQueue("M", 2));
+
+ // Fetch A and then remove it
+ assertEquals(a, fairq.poll());
+ assertEquals(a, fairq.remove("A"));
+
+ // Fetch B and then remove it
+ assertEquals(b, fairq.poll());
+ assertEquals(b, fairq.remove("B"));
+
+ // Fetch M and then remove it
+ assertEquals(m, fairq.poll());
+ assertEquals(m, fairq.remove("M"));
+
+ // nothing left
+ assertEquals(null, fairq.poll());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/04246c6c/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
new file mode 100644
index 0000000..0b7395b
--- /dev/null
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
@@ -0,0 +1,488 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Threads;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@Category({MasterTests.class, SmallTests.class})
+public class TestProcedureRecovery {
+ private static final Log LOG = LogFactory.getLog(TestProcedureRecovery.class);
+
+ private static final int PROCEDURE_EXECUTOR_SLOTS = 1;
+ private static final Procedure NULL_PROC = null;
+
+ private static ProcedureExecutor<Void> procExecutor;
+ private static ProcedureStore procStore;
+ private static int procSleepInterval;
+
+ private HBaseCommonTestingUtility htu;
+ private FileSystem fs;
+ private Path testDir;
+ private Path logDir;
+
+ @Before
+ public void setUp() throws IOException {
+ htu = new HBaseCommonTestingUtility();
+ testDir = htu.getDataTestDir();
+ fs = testDir.getFileSystem(htu.getConfiguration());
+ assertTrue(testDir.depth() > 1);
+
+ logDir = new Path(testDir, "proc-logs");
+ procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), fs, logDir);
+ procExecutor = new ProcedureExecutor(htu.getConfiguration(), null, procStore);
+ procExecutor.testing = new ProcedureExecutor.Testing();
+ procStore.start(PROCEDURE_EXECUTOR_SLOTS);
+ procExecutor.start(PROCEDURE_EXECUTOR_SLOTS);
+ procSleepInterval = 0;
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ procExecutor.stop();
+ procStore.stop(false);
+ fs.delete(logDir, true);
+ }
+
+ private void restart() throws Exception {
+ dumpLogDirState();
+ ProcedureTestingUtility.restart(procExecutor);
+ dumpLogDirState();
+ }
+
+ public static class TestSingleStepProcedure extends SequentialProcedure<Void> {
+ private int step = 0;
+
+ public TestSingleStepProcedure() { }
+
+ @Override
+ protected Procedure[] execute(Void env) {
+ LOG.debug("execute procedure " + this + " step=" + step);
+ step++;
+ setResult(Bytes.toBytes(step));
+ return null;
+ }
+
+ @Override
+ protected void rollback(Void env) { }
+
+ @Override
+ protected boolean abort(Void env) { return true; }
+ }
+
+ public static class BaseTestStepProcedure extends SequentialProcedure<Void> {
+ private AtomicBoolean abort = new AtomicBoolean(false);
+ private int step = 0;
+
+ @Override
+ protected Procedure[] execute(Void env) {
+ LOG.debug("execute procedure " + this + " step=" + step);
+ ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor);
+ step++;
+ Threads.sleepWithoutInterrupt(procSleepInterval);
+ if (isAborted()) {
+ setFailure(new RemoteProcedureException(getClass().getName(),
+ new ProcedureAbortedException(
+ "got an abort at " + getClass().getName() + " step=" + step)));
+ return null;
+ }
+ return null;
+ }
+
+ @Override
+ protected void rollback(Void env) {
+ LOG.debug("rollback procedure " + this + " step=" + step);
+ ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor);
+ step++;
+ }
+
+ @Override
+ protected boolean abort(Void env) {
+ abort.set(true);
+ return true;
+ }
+
+ private boolean isAborted() {
+ boolean aborted = abort.get();
+ BaseTestStepProcedure proc = this;
+ while (proc.hasParent() && !aborted) {
+ proc = (BaseTestStepProcedure)procExecutor.getProcedure(proc.getParentProcId());
+ aborted = proc.isAborted();
+ }
+ return aborted;
+ }
+ }
+
+ public static class TestMultiStepProcedure extends BaseTestStepProcedure {
+ public TestMultiStepProcedure() { }
+
+ @Override
+ public Procedure[] execute(Void env) {
+ super.execute(env);
+ return isFailed() ? null : new Procedure[] { new Step1Procedure() };
+ }
+
+ public static class Step1Procedure extends BaseTestStepProcedure {
+ public Step1Procedure() { }
+
+ @Override
+ protected Procedure[] execute(Void env) {
+ super.execute(env);
+ return isFailed() ? null : new Procedure[] { new Step2Procedure() };
+ }
+ }
+
+ public static class Step2Procedure extends BaseTestStepProcedure {
+ public Step2Procedure() { }
+ }
+ }
+
+ @Test
+ public void testNoopLoad() throws Exception {
+ restart();
+ }
+
+ @Test(timeout=30000)
+ public void testSingleStepProcRecovery() throws Exception {
+ Procedure proc = new TestSingleStepProcedure();
+ procExecutor.testing.killBeforeStoreUpdate = true;
+ long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
+ assertFalse(procExecutor.isRunning());
+ procExecutor.testing.killBeforeStoreUpdate = false;
+
+ // Restart and verify that the procedures restart
+ long restartTs = EnvironmentEdgeManager.currentTime();
+ restart();
+ waitProcedure(procId);
+ ProcedureResult result = procExecutor.getResult(procId);
+ assertTrue(result.getLastUpdate() > restartTs);
+ ProcedureTestingUtility.assertProcNotFailed(result);
+ assertEquals(1, Bytes.toInt(result.getResult()));
+ long resultTs = result.getLastUpdate();
+
+ // Verify that after another restart the result is still there
+ restart();
+ result = procExecutor.getResult(procId);
+ ProcedureTestingUtility.assertProcNotFailed(result);
+ assertEquals(resultTs, result.getLastUpdate());
+ assertEquals(1, Bytes.toInt(result.getResult()));
+ }
+
+ @Test(timeout=30000)
+ public void testMultiStepProcRecovery() throws Exception {
+ // Step 0 - kill
+ Procedure proc = new TestMultiStepProcedure();
+ long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
+ assertFalse(procExecutor.isRunning());
+
+ // Step 0 exec && Step 1 - kill
+ restart();
+ waitProcedure(procId);
+ ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
+ assertFalse(procExecutor.isRunning());
+
+ // Step 1 exec && step 2 - kill
+ restart();
+ waitProcedure(procId);
+ ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
+ assertFalse(procExecutor.isRunning());
+
+ // Step 2 exec
+ restart();
+ waitProcedure(procId);
+ assertTrue(procExecutor.isRunning());
+
+ // The procedure is completed
+ ProcedureResult result = procExecutor.getResult(procId);
+ ProcedureTestingUtility.assertProcNotFailed(result);
+ }
+
+ @Test(timeout=30000)
+ public void testMultiStepRollbackRecovery() throws Exception {
+ // Step 0 - kill
+ Procedure proc = new TestMultiStepProcedure();
+ long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
+ assertFalse(procExecutor.isRunning());
+
+ // Step 0 exec && Step 1 - kill
+ restart();
+ waitProcedure(procId);
+ ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
+ assertFalse(procExecutor.isRunning());
+
+ // Step 1 exec && step 2 - kill
+ restart();
+ waitProcedure(procId);
+ ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
+ assertFalse(procExecutor.isRunning());
+
+ // Step 2 exec - rollback - kill
+ procSleepInterval = 2500;
+ restart();
+ assertTrue(procExecutor.abort(procId));
+ waitProcedure(procId);
+ assertFalse(procExecutor.isRunning());
+
+ // rollback - kill
+ restart();
+ waitProcedure(procId);
+ ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
+ assertFalse(procExecutor.isRunning());
+
+ // rollback - complete
+ restart();
+ waitProcedure(procId);
+ ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
+ assertFalse(procExecutor.isRunning());
+
+ // Restart the executor and get the result
+ restart();
+ waitProcedure(procId);
+
+ // The procedure is completed
+ ProcedureResult result = procExecutor.getResult(procId);
+ ProcedureTestingUtility.assertIsAbortException(result);
+ }
+
+ public static class TestStateMachineProcedure
+ extends StateMachineProcedure<Void, TestStateMachineProcedure.State> {
+ enum State { STATE_1, STATE_2, STATE_3, DONE }
+
+ public TestStateMachineProcedure() {}
+
+ private AtomicBoolean aborted = new AtomicBoolean(false);
+ private int iResult = 0;
+
+ @Override
+ protected StateMachineProcedure.Flow executeFromState(Void env, State state) {
+ switch (state) {
+ case STATE_1:
+ LOG.info("execute step 1 " + this);
+ setNextState(State.STATE_2);
+ iResult += 3;
+ break;
+ case STATE_2:
+ LOG.info("execute step 2 " + this);
+ setNextState(State.STATE_3);
+ iResult += 5;
+ break;
+ case STATE_3:
+ LOG.info("execute step 3 " + this);
+ Threads.sleepWithoutInterrupt(procSleepInterval);
+ if (aborted.get()) {
+ LOG.info("aborted step 3 " + this);
+ setAbortFailure("test", "aborted");
+ break;
+ }
+ setNextState(State.DONE);
+ iResult += 7;
+ setResult(Bytes.toBytes(iResult));
+ return Flow.NO_MORE_STATE;
+ default:
+ throw new UnsupportedOperationException();
+ }
+ return Flow.HAS_MORE_STATE;
+ }
+
+ @Override
+ protected void rollbackState(Void env, final State state) {
+ switch (state) {
+ case STATE_1:
+ LOG.info("rollback step 1 " + this);
+ break;
+ case STATE_2:
+ LOG.info("rollback step 2 " + this);
+ break;
+ case STATE_3:
+ LOG.info("rollback step 3 " + this);
+ break;
+ default:
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ @Override
+ protected State getState(final int stateId) {
+ return State.values()[stateId];
+ }
+
+ @Override
+ protected int getStateId(final State state) {
+ return state.ordinal();
+ }
+
+ @Override
+ protected State getInitialState() {
+ return State.STATE_1;
+ }
+
+ @Override
+ protected boolean abort(Void env) {
+ aborted.set(true);
+ return true;
+ }
+
+ @Override
+ protected void serializeStateData(final OutputStream stream) throws IOException {
+ super.serializeStateData(stream);
+ stream.write(Bytes.toBytes(iResult));
+ }
+
+ @Override
+ protected void deserializeStateData(final InputStream stream) throws IOException {
+ super.deserializeStateData(stream);
+ byte[] data = new byte[4];
+ stream.read(data);
+ iResult = Bytes.toInt(data);
+ }
+ }
+
+ @Test(timeout=30000)
+ public void testStateMachineRecovery() throws Exception {
+ ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, true);
+ ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, true);
+
+ // Step 1 - kill
+ Procedure proc = new TestStateMachineProcedure();
+ long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
+ assertFalse(procExecutor.isRunning());
+
+ // Step 1 exec && Step 2 - kill
+ restart();
+ waitProcedure(procId);
+ ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
+ assertFalse(procExecutor.isRunning());
+
+ // Step 2 exec && step 3 - kill
+ restart();
+ waitProcedure(procId);
+ ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
+ assertFalse(procExecutor.isRunning());
+
+ // Step 3 exec
+ restart();
+ waitProcedure(procId);
+ assertTrue(procExecutor.isRunning());
+
+ // The procedure is completed
+ ProcedureResult result = procExecutor.getResult(procId);
+ ProcedureTestingUtility.assertProcNotFailed(result);
+ assertEquals(15, Bytes.toInt(result.getResult()));
+ }
+
+ @Test(timeout=30000)
+ public void testStateMachineRollbackRecovery() throws Exception {
+ ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, true);
+ ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, true);
+
+ // Step 1 - kill
+ Procedure proc = new TestStateMachineProcedure();
+ long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
+ ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
+ assertFalse(procExecutor.isRunning());
+
+ // Step 1 exec && Step 2 - kill
+ restart();
+ waitProcedure(procId);
+ ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
+ assertFalse(procExecutor.isRunning());
+
+ // Step 2 exec && step 3 - kill
+ restart();
+ waitProcedure(procId);
+ ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
+ assertFalse(procExecutor.isRunning());
+
+ // Step 3 exec - rollback step 3 - kill
+ procSleepInterval = 2500;
+ restart();
+ assertTrue(procExecutor.abort(procId));
+ waitProcedure(procId);
+ ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
+ assertFalse(procExecutor.isRunning());
+
+ // Rollback step 3 - rollback step 2 - kill
+ restart();
+ waitProcedure(procId);
+ assertFalse(procExecutor.isRunning());
+ ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
+
+ // Rollback step 2 - step 1 - kill
+ restart();
+ waitProcedure(procId);
+ assertFalse(procExecutor.isRunning());
+ ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
+
+ // Rollback step 1 - complete
+ restart();
+ waitProcedure(procId);
+ assertTrue(procExecutor.isRunning());
+
+ // The procedure is completed
+ ProcedureResult result = procExecutor.getResult(procId);
+ ProcedureTestingUtility.assertIsAbortException(result);
+ }
+
+ private void waitProcedure(final long procId) {
+ ProcedureTestingUtility.waitProcedure(procExecutor, procId);
+ dumpLogDirState();
+ }
+
+ private void dumpLogDirState() {
+ try {
+ FileStatus[] files = fs.listStatus(logDir);
+ if (files != null && files.length > 0) {
+ for (FileStatus file: files) {
+ assertTrue(file.toString(), file.isFile());
+ LOG.debug("log file " + file.getPath() + " size=" + file.getLen());
+ }
+ } else {
+ LOG.debug("no files under: " + logDir);
+ }
+ } catch (IOException e) {
+ LOG.warn("Unable to dump " + logDir, e);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/04246c6c/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java
new file mode 100644
index 0000000..88645ed
--- /dev/null
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@Category({MasterTests.class, LargeTests.class})
+public class TestProcedureReplayOrder {
+ private static final Log LOG = LogFactory.getLog(TestProcedureReplayOrder.class);
+
+ private static final Procedure NULL_PROC = null;
+
+ private ProcedureExecutor<Void> procExecutor;
+ private TestProcedureEnv procEnv;
+ private ProcedureStore procStore;
+
+ private HBaseCommonTestingUtility htu;
+ private FileSystem fs;
+ private Path testDir;
+ private Path logDir;
+
+ @Before
+ public void setUp() throws IOException {
+ htu = new HBaseCommonTestingUtility();
+ htu.getConfiguration().setInt("hbase.procedure.store.wal.sync.wait.msec", 10);
+
+ testDir = htu.getDataTestDir();
+ fs = testDir.getFileSystem(htu.getConfiguration());
+ assertTrue(testDir.depth() > 1);
+
+ logDir = new Path(testDir, "proc-logs");
+ procEnv = new TestProcedureEnv();
+ procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir);
+ procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore);
+ procStore.start(24);
+ procExecutor.start(1);
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ procExecutor.stop();
+ procStore.stop(false);
+ fs.delete(logDir, true);
+ }
+
+ @Test(timeout=90000)
+ public void testSingleStepReplyOrder() throws Exception {
+ // avoid the procedure to be runnable
+ procEnv.setAcquireLock(false);
+
+ // submit the procedures
+ submitProcedures(16, 25, TestSingleStepProcedure.class);
+
+ // restart the executor and allow the procedures to run
+ ProcedureTestingUtility.restart(procExecutor, new Runnable() {
+ @Override
+ public void run() {
+ procEnv.setAcquireLock(true);
+ }
+ });
+
+ // wait the execution of all the procedures and
+ // assert that the execution order was sorted by procId
+ ProcedureTestingUtility.waitNoProcedureRunning(procExecutor);
+ procEnv.assertSortedExecList();
+
+ // TODO: FIXME: This should be revisited
+ }
+
+ @Ignore
+ @Test(timeout=90000)
+ public void testMultiStepReplyOrder() throws Exception {
+ // avoid the procedure to be runnable
+ procEnv.setAcquireLock(false);
+
+ // submit the procedures
+ submitProcedures(16, 10, TestTwoStepProcedure.class);
+
+ // restart the executor and allow the procedures to run
+ ProcedureTestingUtility.restart(procExecutor, new Runnable() {
+ @Override
+ public void run() {
+ procEnv.setAcquireLock(true);
+ }
+ });
+
+ fail("TODO: FIXME: NOT IMPLEMENT REPLAY ORDER");
+ }
+
+ private void submitProcedures(final int nthreads, final int nprocPerThread,
+ final Class<?> procClazz) throws Exception {
+ Thread[] submitThreads = new Thread[nthreads];
+ for (int i = 0; i < submitThreads.length; ++i) {
+ submitThreads[i] = new Thread() {
+ @Override
+ public void run() {
+ for (int i = 0; i < nprocPerThread; ++i) {
+ try {
+ procExecutor.submitProcedure((Procedure)procClazz.newInstance());
+ } catch (InstantiationException|IllegalAccessException e) {
+ LOG.error("unable to instantiate the procedure", e);
+ fail("failure during the proc.newInstance(): " + e.getMessage());
+ }
+ }
+ }
+ };
+ }
+
+ for (int i = 0; i < submitThreads.length; ++i) {
+ submitThreads[i].start();
+ }
+
+ for (int i = 0; i < submitThreads.length; ++i) {
+ submitThreads[i].join();
+ }
+ }
+
+ private static class TestProcedureEnv {
+ private ArrayList<Long> execList = new ArrayList<Long>();
+ private boolean acquireLock = true;
+
+ public void setAcquireLock(boolean acquireLock) {
+ this.acquireLock = acquireLock;
+ }
+
+ public boolean canAcquireLock() {
+ return acquireLock;
+ }
+
+ public void addToExecList(final Procedure proc) {
+ execList.add(proc.getProcId());
+ }
+
+ public ArrayList<Long> getExecList() {
+ return execList;
+ }
+
+ public void assertSortedExecList() {
+ LOG.debug("EXEC LIST: " + execList);
+ for (int i = 1; i < execList.size(); ++i) {
+ assertTrue("exec list not sorted: " + execList.get(i-1) + " >= " + execList.get(i),
+ execList.get(i-1) < execList.get(i));
+ }
+ }
+ }
+
+ public static class TestSingleStepProcedure extends SequentialProcedure<TestProcedureEnv> {
+ public TestSingleStepProcedure() { }
+
+ @Override
+ protected Procedure[] execute(TestProcedureEnv env) {
+ LOG.debug("execute procedure " + this);
+ env.addToExecList(this);
+ return null;
+ }
+
+ protected boolean acquireLock(final TestProcedureEnv env) {
+ return env.canAcquireLock();
+ }
+
+ @Override
+ protected void rollback(TestProcedureEnv env) { }
+
+ @Override
+ protected boolean abort(TestProcedureEnv env) { return true; }
+ }
+
+ public static class TestTwoStepProcedure extends SequentialProcedure<TestProcedureEnv> {
+ public TestTwoStepProcedure() { }
+
+ @Override
+ protected Procedure[] execute(TestProcedureEnv env) {
+ LOG.debug("execute procedure " + this);
+ env.addToExecList(this);
+ return new Procedure[] { new TestSingleStepProcedure() };
+ }
+
+ protected boolean acquireLock(final TestProcedureEnv env) {
+ return true;
+ }
+
+ @Override
+ protected void rollback(TestProcedureEnv env) { }
+
+ @Override
+ protected boolean abort(TestProcedureEnv env) { return true; }
+ }
+}
\ No newline at end of file