You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2015/12/17 00:45:17 UTC
[06/12] hbase git commit: HBASE-14947 WALProcedureStore improvements
HBASE-14947 WALProcedureStore improvements
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/60d33ce3
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/60d33ce3
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/60d33ce3
Branch: refs/heads/hbase-12439
Commit: 60d33ce34191533bb858852584bd9bddfeb16a23
Parents: b8539c6
Author: Matteo Bertozzi <ma...@cloudera.com>
Authored: Tue Dec 15 10:15:18 2015 -0800
Committer: Matteo Bertozzi <ma...@cloudera.com>
Committed: Tue Dec 15 12:52:34 2015 -0800
----------------------------------------------------------------------
.../procedure2/store/ProcedureStoreTracker.java | 21 +-
.../procedure2/store/wal/WALProcedureStore.java | 294 ++++++++++---------
.../hbase/procedure2/TestProcedureRecovery.java | 4 +-
.../store/TestProcedureStoreTracker.java | 35 +--
.../store/wal/TestWALProcedureStore.java | 51 +++-
5 files changed, 203 insertions(+), 202 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/60d33ce3/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
index 8516f61..6823288 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
@@ -27,7 +27,6 @@ import java.util.TreeMap;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos;
/**
@@ -356,25 +355,19 @@ public class ProcedureStoreTracker {
}
}
- public void insert(final Procedure proc, final Procedure[] subprocs) {
- insert(proc.getProcId());
- if (subprocs != null) {
- for (int i = 0; i < subprocs.length; ++i) {
- insert(subprocs[i].getProcId());
- }
- }
- }
-
- public void update(final Procedure proc) {
- update(proc.getProcId());
- }
-
public void insert(long procId) {
BitSetNode node = getOrCreateNode(procId);
node.update(procId);
trackProcIds(procId);
}
+ public void insert(final long procId, final long[] subProcIds) {
+ update(procId);
+ for (int i = 0; i < subProcIds.length; ++i) {
+ insert(subProcIds[i]);
+ }
+ }
+
public void update(long procId) {
Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId);
assert entry != null : "expected node to update procId=" + procId;
http://git-wip-us.apache.org/repos/asf/hbase/blob/60d33ce3/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
index ec42d6a..20709a9 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
@@ -100,7 +100,6 @@ public class WALProcedureStore extends ProcedureStoreBase {
private final LinkedList<ProcedureWALFile> logs = new LinkedList<ProcedureWALFile>();
private final ProcedureStoreTracker storeTracker = new ProcedureStoreTracker();
- private final AtomicLong inactiveLogsMaxId = new AtomicLong(0);
private final ReentrantLock lock = new ReentrantLock();
private final Condition waitCond = lock.newCondition();
private final Condition slotCond = lock.newCondition();
@@ -191,19 +190,16 @@ public class WALProcedureStore extends ProcedureStoreBase {
}
LOG.info("Stopping the WAL Procedure Store");
- if (lock.tryLock()) {
- try {
- waitCond.signalAll();
- syncCond.signalAll();
- } finally {
- lock.unlock();
- }
- }
+ sendStopSignal();
if (!abort) {
try {
- syncThread.join();
+ while (syncThread.isAlive()) {
+ sendStopSignal();
+ syncThread.join(250);
+ }
} catch (InterruptedException e) {
+ LOG.warn("join interrupted", e);
Thread.currentThread().interrupt();
}
}
@@ -220,6 +216,17 @@ public class WALProcedureStore extends ProcedureStoreBase {
logs.clear();
}
+ private void sendStopSignal() {
+ if (lock.tryLock()) {
+ try {
+ waitCond.signalAll();
+ syncCond.signalAll();
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+
@Override
public int getNumThreads() {
return slots == null ? 0 : slots.length;
@@ -239,31 +246,36 @@ public class WALProcedureStore extends ProcedureStoreBase {
@Override
public void recoverLease() throws IOException {
- LOG.info("Starting WAL Procedure Store lease recovery");
- FileStatus[] oldLogs = getLogFiles();
- while (isRunning()) {
- // Get Log-MaxID and recover lease on old logs
- flushLogId = initOldLogs(oldLogs);
-
- // Create new state-log
- if (!rollWriter(flushLogId + 1)) {
- // someone else has already created this log
- LOG.debug("someone else has already created log " + flushLogId);
- continue;
- }
+ lock.lock();
+ try {
+ LOG.info("Starting WAL Procedure Store lease recovery");
+ FileStatus[] oldLogs = getLogFiles();
+ while (isRunning()) {
+ // Get Log-MaxID and recover lease on old logs
+ flushLogId = initOldLogs(oldLogs);
+
+ // Create new state-log
+ if (!rollWriter(flushLogId + 1)) {
+ // someone else has already created this log
+ LOG.debug("someone else has already created log " + flushLogId);
+ continue;
+ }
- // We have the lease on the log
- oldLogs = getLogFiles();
- if (getMaxLogId(oldLogs) > flushLogId) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Someone else created new logs. Expected maxLogId < " + flushLogId);
+ // We have the lease on the log
+ oldLogs = getLogFiles();
+ if (getMaxLogId(oldLogs) > flushLogId) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Someone else created new logs. Expected maxLogId < " + flushLogId);
+ }
+ logs.getLast().removeFile();
+ continue;
}
- logs.getLast().removeFile();
- continue;
- }
- LOG.info("Lease acquired for flushLogId: " + flushLogId);
- break;
+ LOG.info("Lease acquired for flushLogId: " + flushLogId);
+ break;
+ }
+ } finally {
+ lock.unlock();
}
}
@@ -335,18 +347,22 @@ public class WALProcedureStore extends ProcedureStoreBase {
}
ByteSlot slot = acquireSlot();
- long logId = -1;
try {
// Serialize the insert
+ long[] subProcIds = null;
if (subprocs != null) {
ProcedureWALFormat.writeInsert(slot, proc, subprocs);
+ subProcIds = new long[subprocs.length];
+ for (int i = 0; i < subprocs.length; ++i) {
+ subProcIds[i] = subprocs[i].getProcId();
+ }
} else {
assert !proc.hasParent();
ProcedureWALFormat.writeInsert(slot, proc);
}
// Push the transaction data and wait until it is persisted
- pushData(slot);
+ pushData(PushType.INSERT, slot, proc.getProcId(), subProcIds);
} catch (IOException e) {
// We are not able to serialize the procedure.
// this is a code error, and we are not able to go on.
@@ -356,14 +372,6 @@ public class WALProcedureStore extends ProcedureStoreBase {
} finally {
releaseSlot(slot);
}
-
- // Update the store tracker
- synchronized (storeTracker) {
- storeTracker.insert(proc, subprocs);
- if (logId == flushLogId) {
- checkAndTryRoll();
- }
- }
}
@Override
@@ -373,13 +381,12 @@ public class WALProcedureStore extends ProcedureStoreBase {
}
ByteSlot slot = acquireSlot();
- long logId = -1;
try {
// Serialize the update
ProcedureWALFormat.writeUpdate(slot, proc);
// Push the transaction data and wait until it is persisted
- logId = pushData(slot);
+ pushData(PushType.UPDATE, slot, proc.getProcId(), null);
} catch (IOException e) {
// We are not able to serialize the procedure.
// this is a code error, and we are not able to go on.
@@ -388,20 +395,6 @@ public class WALProcedureStore extends ProcedureStoreBase {
} finally {
releaseSlot(slot);
}
-
- // Update the store tracker
- boolean removeOldLogs = false;
- synchronized (storeTracker) {
- storeTracker.update(proc);
- if (logId == flushLogId) {
- removeOldLogs = storeTracker.isUpdated();
- checkAndTryRoll();
- }
- }
-
- if (removeOldLogs) {
- setInactiveLogsMaxId(logId - 1);
- }
}
@Override
@@ -411,13 +404,12 @@ public class WALProcedureStore extends ProcedureStoreBase {
}
ByteSlot slot = acquireSlot();
- long logId = -1;
try {
// Serialize the delete
ProcedureWALFormat.writeDelete(slot, procId);
// Push the transaction data and wait until it is persisted
- logId = pushData(slot);
+ pushData(PushType.DELETE, slot, procId, null);
} catch (IOException e) {
// We are not able to serialize the procedure.
// this is a code error, and we are not able to go on.
@@ -426,22 +418,6 @@ public class WALProcedureStore extends ProcedureStoreBase {
} finally {
releaseSlot(slot);
}
-
- boolean removeOldLogs = false;
- synchronized (storeTracker) {
- storeTracker.delete(procId);
- if (logId == flushLogId) {
- if (storeTracker.isEmpty() || storeTracker.isUpdated()) {
- removeOldLogs = checkAndTryRoll();
- } else {
- checkAndTryRoll();
- }
- }
- }
-
- if (removeOldLogs) {
- setInactiveLogsMaxId(logId);
- }
}
private ByteSlot acquireSlot() {
@@ -454,7 +430,10 @@ public class WALProcedureStore extends ProcedureStoreBase {
slotsCache.offer(slot);
}
- private long pushData(final ByteSlot slot) {
+ private enum PushType { INSERT, UPDATE, DELETE };
+
+ private long pushData(final PushType type, final ByteSlot slot,
+ final long procId, final long[] subProcIds) {
if (!isRunning()) {
throw new RuntimeException("the store must be running before inserting data");
}
@@ -481,6 +460,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
}
}
+ updateStoreTracker(type, procId, subProcIds);
slots[slotIndex++] = slot;
logId = flushLogId;
@@ -509,20 +489,29 @@ public class WALProcedureStore extends ProcedureStoreBase {
return logId;
}
- private boolean isSyncAborted() {
- return syncException.get() != null;
+ private void updateStoreTracker(final PushType type,
+ final long procId, final long[] subProcIds) {
+ switch (type) {
+ case INSERT:
+ if (subProcIds == null) {
+ storeTracker.insert(procId);
+ } else {
+ storeTracker.insert(procId, subProcIds);
+ }
+ break;
+ case UPDATE:
+ storeTracker.update(procId);
+ break;
+ case DELETE:
+ storeTracker.delete(procId);
+ break;
+ default:
+ throw new RuntimeException("invalid push type " + type);
+ }
}
- protected void periodicRoll() throws IOException {
- long logId;
- boolean removeOldLogs;
- synchronized (storeTracker) {
- logId = flushLogId;
- removeOldLogs = storeTracker.isEmpty();
- }
- if (checkAndTryRoll() && removeOldLogs) {
- setInactiveLogsMaxId(logId);
- }
+ private boolean isSyncAborted() {
+ return syncException.get() != null;
}
private void syncLoop() throws Throwable {
@@ -534,7 +523,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
// Wait until new data is available
if (slotIndex == 0) {
if (!loading.get()) {
- removeInactiveLogs();
+ periodicRoll();
}
if (LOG.isTraceEnabled()) {
@@ -547,7 +536,6 @@ public class WALProcedureStore extends ProcedureStoreBase {
waitCond.await(getMillisToNextPeriodicRoll(), TimeUnit.MILLISECONDS);
if (slotIndex == 0) {
// no data.. probably a stop() or a periodic roll
- periodicRoll();
continue;
}
}
@@ -560,13 +548,12 @@ public class WALProcedureStore extends ProcedureStoreBase {
long syncWaitMs = System.currentTimeMillis() - syncWaitSt;
if (LOG.isTraceEnabled() && (syncWaitMs > 10 || slotIndex < slots.length)) {
float rollSec = getMillisFromLastRoll() / 1000.0f;
- LOG.trace(String.format("Sync wait %s, slotIndex=%s , totalSynced=%s/sec",
+ LOG.trace(String.format("Sync wait %s, slotIndex=%s , totalSynced=%s (%s/sec)",
StringUtils.humanTimeDiff(syncWaitMs), slotIndex,
StringUtils.humanSize(totalSynced.get()),
StringUtils.humanSize(totalSynced.get() / rollSec)));
}
-
inSync.set(true);
totalSynced.addAndGet(syncSlots());
slotIndex = 0;
@@ -639,8 +626,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
return totalSynced;
}
- @VisibleForTesting
- public boolean rollWriterOrDie() {
+ private boolean rollWriterOrDie() {
for (int i = 1; i <= rollRetries; ++i) {
try {
if (rollWriter()) {
@@ -656,17 +642,13 @@ public class WALProcedureStore extends ProcedureStoreBase {
throw new RuntimeException("unable to roll the log");
}
- protected boolean checkAndTryRoll() {
- if (!isRunning()) return false;
-
- if (totalSynced.get() > rollThreshold || getMillisToNextPeriodicRoll() <= 0) {
- try {
- return rollWriter();
- } catch (IOException e) {
- LOG.warn("Unable to roll the log", e);
- }
+ private boolean tryRollWriter() {
+ try {
+ return rollWriter();
+ } catch (IOException e) {
+ LOG.warn("Unable to roll the log", e);
+ return false;
}
- return false;
}
private long getMillisToNextPeriodicRoll() {
@@ -680,7 +662,52 @@ public class WALProcedureStore extends ProcedureStoreBase {
return (System.currentTimeMillis() - lastRollTs.get());
}
- protected boolean rollWriter() throws IOException {
+ @VisibleForTesting
+ protected void periodicRollForTesting() throws IOException {
+ lock.lock();
+ try {
+ periodicRoll();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @VisibleForTesting
+ protected boolean rollWriterForTesting() throws IOException {
+ lock.lock();
+ try {
+ return rollWriter();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private void periodicRoll() throws IOException {
+ if (storeTracker.isEmpty()) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("no active procedures");
+ }
+ tryRollWriter();
+ removeAllLogs(flushLogId - 1);
+ } else {
+ if (storeTracker.isUpdated()) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("all the active procedures are in the latest log");
+ }
+ removeAllLogs(flushLogId - 1);
+ }
+
+ // if the log size has exceeded the roll threshold
+ // or the periodic roll timeout is expired, try to roll the wal.
+ if (totalSynced.get() > rollThreshold || getMillisToNextPeriodicRoll() <= 0) {
+ tryRollWriter();
+ }
+
+ removeInactiveLogs();
+ }
+ }
+
+ private boolean rollWriter() throws IOException {
// Create new state-log
if (!rollWriter(flushLogId + 1)) {
LOG.warn("someone else has already created log " + flushLogId);
@@ -701,6 +728,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
private boolean rollWriter(final long logId) throws IOException {
assert logId > flushLogId : "logId=" + logId + " flushLogId=" + flushLogId;
+ assert lock.isHeldByCurrentThread() : "expected to be the lock owner. " + lock.isLocked();
ProcedureWALHeader header = ProcedureWALHeader.newBuilder()
.setVersion(ProcedureWALFormat.HEADER_VERSION)
@@ -730,20 +758,16 @@ public class WALProcedureStore extends ProcedureStoreBase {
newStream.close();
return false;
}
- lock.lock();
- try {
- closeStream();
- synchronized (storeTracker) {
- storeTracker.resetUpdates();
- }
- stream = newStream;
- flushLogId = logId;
- totalSynced.set(0);
- lastRollTs.set(System.currentTimeMillis());
- logs.add(new ProcedureWALFile(fs, newLogFile, header, startPos));
- } finally {
- lock.unlock();
- }
+
+ closeStream();
+
+ storeTracker.resetUpdates();
+ stream = newStream;
+ flushLogId = logId;
+ totalSynced.set(0);
+ lastRollTs.set(System.currentTimeMillis());
+ logs.add(new ProcedureWALFile(fs, newLogFile, header, startPos));
+
if (LOG.isDebugEnabled()) {
LOG.debug("Roll new state log: " + logId);
}
@@ -754,11 +778,9 @@ public class WALProcedureStore extends ProcedureStoreBase {
try {
if (stream != null) {
try {
- synchronized (storeTracker) {
- ProcedureWALFile log = logs.getLast();
- log.setProcIds(storeTracker.getUpdatedMinProcId(), storeTracker.getUpdatedMaxProcId());
- ProcedureWALFormat.writeTrailer(stream, storeTracker);
- }
+ ProcedureWALFile log = logs.getLast();
+ log.setProcIds(storeTracker.getUpdatedMinProcId(), storeTracker.getUpdatedMaxProcId());
+ ProcedureWALFormat.writeTrailer(stream, storeTracker);
} catch (IOException e) {
LOG.warn("Unable to write the trailer: " + e.getMessage());
}
@@ -774,30 +796,12 @@ public class WALProcedureStore extends ProcedureStoreBase {
// ==========================================================================
// Log Files cleaner helpers
// ==========================================================================
- private void setInactiveLogsMaxId(long logId) {
- long expect = 0;
- while (!inactiveLogsMaxId.compareAndSet(expect, logId)) {
- expect = inactiveLogsMaxId.get();
- if (expect >= logId) {
- break;
- }
- }
- }
-
private void removeInactiveLogs() {
- long lastLogId = inactiveLogsMaxId.get();
- if (lastLogId != 0) {
- removeAllLogs(lastLogId);
- inactiveLogsMaxId.compareAndSet(lastLogId, 0);
- }
-
// Verify if the ProcId of the first oldest is still active. if not remove the file.
while (logs.size() > 1) {
ProcedureWALFile log = logs.getFirst();
- synchronized (storeTracker) {
- if (storeTracker.isTracking(log.getMinProcId(), log.getMaxProcId())) {
- break;
- }
+ if (storeTracker.isTracking(log.getMinProcId(), log.getMaxProcId())) {
+ break;
}
removeLogFile(log);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/60d33ce3/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
index 534e56e..cae10ef 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
@@ -313,7 +313,7 @@ public class TestProcedureRecovery {
public void testRunningProcWithSameNonce() throws Exception {
final long nonceGroup = 456;
final long nonce = 33333;
- Procedure proc = new TestMultiStepProcedure();
+ Procedure proc = new TestSingleStepProcedure();
long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc, nonceGroup, nonce);
// Restart (use a latch to prevent the step execution until we submitted proc2)
@@ -321,7 +321,7 @@ public class TestProcedureRecovery {
procEnv.setWaitLatch(latch);
restart();
// Submit a procedure with the same nonce and expect the same procedure would return.
- Procedure proc2 = new TestMultiStepProcedure();
+ Procedure proc2 = new TestSingleStepProcedure();
long procId2 = procExecutor.submitProcedure(proc2, nonceGroup, nonce);
latch.countDown();
procEnv.setWaitLatch(null);
http://git-wip-us.apache.org/repos/asf/hbase/blob/60d33ce3/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java
index 7d6eb1c..941803a 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java
@@ -25,7 +25,6 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.testclassification.MasterTests;
@@ -42,27 +41,6 @@ import static org.junit.Assert.fail;
public class TestProcedureStoreTracker {
private static final Log LOG = LogFactory.getLog(TestProcedureStoreTracker.class);
- static class TestProcedure extends Procedure<Void> {
- public TestProcedure(long procId) {
- setProcId(procId);
- }
-
- @Override
- protected Procedure[] execute(Void env) { return null; }
-
- @Override
- protected void rollback(Void env) { /* no-op */ }
-
- @Override
- protected boolean abort(Void env) { return false; }
-
- @Override
- protected void serializeStateData(final OutputStream stream) { /* no-op */ }
-
- @Override
- protected void deserializeStateData(final InputStream stream) { /* no-op */ }
- }
-
@Test
public void testSeqInsertAndDelete() {
ProcedureStoreTracker tracker = new ProcedureStoreTracker();
@@ -162,13 +140,10 @@ public class TestProcedureStoreTracker {
ProcedureStoreTracker tracker = new ProcedureStoreTracker();
assertTrue(tracker.isEmpty());
- Procedure[] procs = new TestProcedure[] {
- new TestProcedure(1), new TestProcedure(2), new TestProcedure(3),
- new TestProcedure(4), new TestProcedure(5), new TestProcedure(6),
- };
+ long[] procs = new long[] { 1, 2, 3, 4, 5, 6 };
- tracker.insert(procs[0], null);
- tracker.insert(procs[1], new Procedure[] { procs[2], procs[3], procs[4] });
+ tracker.insert(procs[0]);
+ tracker.insert(procs[1], new long[] { procs[2], procs[3], procs[4] });
assertFalse(tracker.isEmpty());
assertTrue(tracker.isUpdated());
@@ -190,11 +165,11 @@ public class TestProcedureStoreTracker {
assertTrue(tracker.isUpdated());
for (int i = 0; i < 5; ++i) {
- tracker.delete(procs[i].getProcId());
+ tracker.delete(procs[i]);
assertFalse(tracker.isEmpty());
assertTrue(tracker.isUpdated());
}
- tracker.delete(procs[5].getProcId());
+ tracker.delete(procs[5]);
assertTrue(tracker.isEmpty());
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/60d33ce3/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
index 35c8c78..6dbc3aa 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
@@ -102,7 +102,7 @@ public class TestWALProcedureStore {
@Test
public void testEmptyRoll() throws Exception {
for (int i = 0; i < 10; ++i) {
- procStore.periodicRoll();
+ procStore.periodicRollForTesting();
}
FileStatus[] status = fs.listStatus(logDir);
assertEquals(1, status.length);
@@ -214,14 +214,14 @@ public class TestWALProcedureStore {
procStore.update(rootProcs[i-1]);
}
// insert root-child txn
- procStore.rollWriter();
+ procStore.rollWriterForTesting();
for (int i = 1; i <= rootProcs.length; i++) {
TestProcedure b = new TestProcedure(rootProcs.length + i, i);
rootProcs[i-1].addStackId(1);
procStore.insert(rootProcs[i-1], new Procedure[] { b });
}
// insert child updates
- procStore.rollWriter();
+ procStore.rollWriterForTesting();
for (int i = 1; i <= rootProcs.length; i++) {
procStore.update(new TestProcedure(rootProcs.length + i, i));
}
@@ -229,9 +229,10 @@ public class TestWALProcedureStore {
// Stop the store
procStore.stop(false);
- // Remove 4 byte from the trailer
+ // the first log was removed,
+ // we have insert-txn and updates in the others so everything is fine
FileStatus[] logs = fs.listStatus(logDir);
- assertEquals(3, logs.length);
+ assertEquals(Arrays.toString(logs), 2, logs.length);
Arrays.sort(logs, new Comparator<FileStatus>() {
@Override
public int compare(FileStatus o1, FileStatus o2) {
@@ -239,15 +240,13 @@ public class TestWALProcedureStore {
}
});
- // Remove the first log, we have insert-txn and updates in the others so everything is fine.
- fs.delete(logs[0].getPath(), false);
LoadCounter loader = new LoadCounter();
storeRestart(loader);
assertEquals(rootProcs.length * 2, loader.getLoadedCount());
assertEquals(0, loader.getCorruptedCount());
- // Remove the second log, we have lost any root/parent references
- fs.delete(logs[1].getPath(), false);
+ // Remove the second log, we have lost all the root/parent references
+ fs.delete(logs[0].getPath(), false);
loader.reset();
storeRestart(loader);
assertEquals(0, loader.getLoadedCount());
@@ -276,7 +275,7 @@ public class TestWALProcedureStore {
b.addStackId(1);
procStore.update(b);
- procStore.rollWriter();
+ procStore.rollWriterForTesting();
a.addStackId(2);
procStore.update(a);
@@ -325,7 +324,7 @@ public class TestWALProcedureStore {
b.addStackId(2);
procStore.update(b);
- procStore.rollWriter();
+ procStore.rollWriterForTesting();
b.addStackId(3);
procStore.update(b);
@@ -426,6 +425,36 @@ public class TestWALProcedureStore {
assertEquals(1, procStore.getActiveLogs().size());
}
+ @Test
+ public void testRollAndRemove() throws IOException {
+ // Insert something in the log
+ Procedure proc1 = new TestSequentialProcedure();
+ procStore.insert(proc1, null);
+
+ Procedure proc2 = new TestSequentialProcedure();
+ procStore.insert(proc2, null);
+
+ // roll the log, now we have 2
+ procStore.rollWriterForTesting();
+ assertEquals(2, procStore.getActiveLogs().size());
+
+ // everything will be up to date in the second log
+ // so we can remove the first one
+ procStore.update(proc1);
+ procStore.update(proc2);
+ assertEquals(1, procStore.getActiveLogs().size());
+
+ // roll the log, now we have 2
+ procStore.rollWriterForTesting();
+ assertEquals(2, procStore.getActiveLogs().size());
+
+ // remove everything active
+ // so we can remove all the logs
+ procStore.delete(proc1.getProcId());
+ procStore.delete(proc2.getProcId());
+ assertEquals(1, procStore.getActiveLogs().size());
+ }
+
private void corruptLog(final FileStatus logFile, final long dropBytes)
throws IOException {
assertTrue(logFile.getLen() > dropBytes);