You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by go...@apache.org on 2017/03/07 00:34:16 UTC
[3/9] incubator-tephra git commit: TEPHRA-223 Encapsulate the two
data structures used for invalid transactions to avoid update issues
TEPHRA-223 Encapsulate the two data structures used for invalid transactions to avoid update issues
This closes #37
Signed-off-by: poorna <po...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/872fb109
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/872fb109
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/872fb109
Branch: refs/heads/master
Commit: 872fb1090efadb3d62a75fa8f51b308511eea754
Parents: 10e36e6
Author: poorna <po...@cask.co>
Authored: Mon Feb 20 17:13:39 2017 -0800
Committer: poorna <po...@apache.org>
Committed: Tue Feb 21 17:46:33 2017 -0800
----------------------------------------------------------------------
.../org/apache/tephra/TransactionManager.java | 104 +++++++--------
.../apache/tephra/manager/InvalidTxList.java | 126 +++++++++++++++++++
.../tephra/manager/InvalidTxListTest.java | 110 ++++++++++++++++
.../AbstractTransactionStateStorageTest.java | 10 ++
4 files changed, 294 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/872fb109/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java b/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java
index 0b90d7f..f2060cd 100644
--- a/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java
+++ b/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java
@@ -29,7 +29,11 @@ import com.google.common.collect.Sets;
import com.google.common.util.concurrent.AbstractService;
import com.google.inject.Inject;
import it.unimi.dsi.fastutil.longs.LongArrayList;
+import it.unimi.dsi.fastutil.longs.LongArraySet;
+import it.unimi.dsi.fastutil.longs.LongIterator;
+import it.unimi.dsi.fastutil.longs.LongSet;
import org.apache.hadoop.conf.Configuration;
+import org.apache.tephra.manager.InvalidTxList;
import org.apache.tephra.metrics.DefaultMetricsCollector;
import org.apache.tephra.metrics.MetricsCollector;
import org.apache.tephra.persist.NoOpTransactionStateStorage;
@@ -46,7 +50,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Collection;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
@@ -115,15 +118,11 @@ public class TransactionManager extends AbstractService {
//poll every 10 second to emit metrics
private static final long METRICS_POLL_INTERVAL = 10000L;
- private static final long[] NO_INVALID_TX = { };
-
// Transactions that are in progress, with their info.
private final NavigableMap<Long, InProgressTx> inProgress = new ConcurrentSkipListMap<Long, InProgressTx>();
// the list of transactions that are invalid (not properly committed/aborted, or timed out)
- // TODO: explain usage of two arrays
- private final LongArrayList invalid = new LongArrayList();
- private long[] invalidArray = NO_INVALID_TX;
+ private final InvalidTxList invalidTxList = new InvalidTxList();
// todo: use moving array instead (use Long2ObjectMap<byte[]> in fastutil)
// todo: should this be consolidated with inProgress?
@@ -198,8 +197,7 @@ public class TransactionManager extends AbstractService {
}
private void clear() {
- invalid.clear();
- invalidArray = NO_INVALID_TX;
+ invalidTxList.clear();
inProgress.clear();
committedChangeSets.clear();
committingChangeSets.clear();
@@ -318,7 +316,7 @@ public class TransactionManager extends AbstractService {
txMetricsCollector.gauge("committing.size", committingChangeSets.size());
txMetricsCollector.gauge("committed.size", committedChangeSets.size());
txMetricsCollector.gauge("inprogress.size", inProgress.size());
- txMetricsCollector.gauge("invalid.size", invalidArray.length);
+ txMetricsCollector.gauge("invalid.size", getInvalidSize());
}
@Override
@@ -327,7 +325,7 @@ public class TransactionManager extends AbstractService {
txMetricsCollector.gauge("committing.size", committingChangeSets.size());
txMetricsCollector.gauge("committed.size", committedChangeSets.size());
txMetricsCollector.gauge("inprogress.size", inProgress.size());
- txMetricsCollector.gauge("invalid.size", invalidArray.length);
+ txMetricsCollector.gauge("invalid.size", getInvalidSize());
}
@Override
@@ -363,7 +361,7 @@ public class TransactionManager extends AbstractService {
}
if (!timedOut.isEmpty()) {
invalidEdits = Lists.newArrayListWithCapacity(timedOut.size());
- invalid.addAll(timedOut.keySet());
+ invalidTxList.addAll(timedOut.keySet());
for (Map.Entry<Long, InProgressType> tx : timedOut.entrySet()) {
inProgress.remove(tx.getKey());
// checkpoints never go into the committing change sets or the edits
@@ -373,9 +371,6 @@ public class TransactionManager extends AbstractService {
}
}
- // todo: find a more efficient way to keep this sorted. Could it just be an array?
- Collections.sort(invalid);
- invalidArray = invalid.toLongArray();
LOG.info("Invalidated {} transactions due to timeout.", timedOut.size());
}
}
@@ -468,7 +463,8 @@ public class TransactionManager extends AbstractService {
public synchronized TransactionSnapshot getCurrentState() {
return TransactionSnapshot.copyFrom(System.currentTimeMillis(), readPointer, lastWritePointer,
- invalid, inProgress, committingChangeSets, committedChangeSets);
+ invalidTxList.toRawList(), inProgress, committingChangeSets,
+ committedChangeSets);
}
public synchronized void recoverState() {
@@ -497,7 +493,7 @@ public class TransactionManager extends AbstractService {
Preconditions.checkState(lastSnapshotTime == 0, "lastSnapshotTime has been set!");
Preconditions.checkState(readPointer == 0, "readPointer has been set!");
Preconditions.checkState(lastWritePointer == 0, "lastWritePointer has been set!");
- Preconditions.checkState(invalid.isEmpty(), "invalid list should be empty!");
+ Preconditions.checkState(invalidTxList.isEmpty(), "invalid list should be empty!");
Preconditions.checkState(inProgress.isEmpty(), "inProgress map should be empty!");
Preconditions.checkState(committingChangeSets.isEmpty(), "committingChangeSets should be empty!");
Preconditions.checkState(committedChangeSets.isEmpty(), "committedChangeSets should be empty!");
@@ -506,7 +502,7 @@ public class TransactionManager extends AbstractService {
lastSnapshotTime = snapshot.getTimestamp();
readPointer = snapshot.getReadPointer();
lastWritePointer = snapshot.getWritePointer();
- invalid.addAll(snapshot.getInvalid());
+ invalidTxList.addAll(snapshot.getInvalid());
inProgress.putAll(txnBackwardsCompatCheck(defaultLongTimeout, longTimeoutTolerance, snapshot.getInProgress()));
committingChangeSets.putAll(snapshot.getCommittingChangeSets());
committedChangeSets.putAll(snapshot.getCommittedChangeSets());
@@ -818,14 +814,17 @@ public class TransactionManager extends AbstractService {
txMetricsCollector.rate("canCommit");
Stopwatch timer = new Stopwatch().start();
if (inProgress.get(tx.getTransactionId()) == null) {
- // invalid transaction, either this has timed out and moved to invalid, or something else is wrong.
- if (invalid.contains(tx.getTransactionId())) {
- throw new TransactionNotInProgressException(
- String.format("canCommit() is called for transaction %d that is not in progress (it is known to be invalid)",
- tx.getTransactionId()));
- } else {
- throw new TransactionNotInProgressException(
- String.format("canCommit() is called for transaction %d that is not in progress", tx.getTransactionId()));
+ synchronized (this) {
+ // invalid transaction, either this has timed out and moved to invalid, or something else is wrong.
+ if (invalidTxList.contains(tx.getTransactionId())) {
+ throw new TransactionNotInProgressException(
+ String.format(
+ "canCommit() is called for transaction %d that is not in progress (it is known to be invalid)",
+ tx.getTransactionId()));
+ } else {
+ throw new TransactionNotInProgressException(
+ String.format("canCommit() is called for transaction %d that is not in progress", tx.getTransactionId()));
+ }
}
}
@@ -872,7 +871,7 @@ public class TransactionManager extends AbstractService {
commitPointer = lastWritePointer + 1;
if (inProgress.get(tx.getTransactionId()) == null) {
// invalid transaction, either this has timed out and moved to invalid, or something else is wrong.
- if (invalid.contains(tx.getTransactionId())) {
+ if (invalidTxList.contains(tx.getTransactionId())) {
throw new TransactionNotInProgressException(
String.format("canCommit() is called for transaction %d that is not in progress " +
"(it is known to be invalid)", tx.getTransactionId()));
@@ -928,8 +927,7 @@ public class TransactionManager extends AbstractService {
InProgressTx previous = inProgress.remove(transactionId);
if (previous == null) {
// tx was not in progress! perhaps it timed out and is invalid? try to remove it there.
- if (invalid.rem(transactionId)) {
- invalidArray = invalid.toLongArray();
+ if (invalidTxList.remove(transactionId)) {
LOG.info("Tx invalid list: removed committed tx {}", transactionId);
}
} else {
@@ -983,17 +981,16 @@ public class TransactionManager extends AbstractService {
boolean removeInProgressCheckpoints = true;
if (removed == null) {
// tx was not in progress! perhaps it timed out and is invalid? try to remove it there.
- if (invalid.rem(writePointer)) {
+ if (invalidTxList.remove(writePointer)) {
// the tx and all its children were invalidated: no need to remove them from inProgress
removeInProgressCheckpoints = false;
// remove any invalidated checkpoint pointers
// this will only be present if the parent write pointer was also invalidated
if (checkpointWritePointers != null) {
for (long checkpointWritePointer : checkpointWritePointers) {
- invalid.rem(checkpointWritePointer);
+ invalidTxList.remove(checkpointWritePointer);
}
}
- invalidArray = invalid.toLongArray();
LOG.info("Tx invalid list: removed aborted tx {}", writePointer);
}
}
@@ -1032,21 +1029,18 @@ public class TransactionManager extends AbstractService {
// This check is to prevent from invalidating committed transactions
if (previous != null || previousChangeSet != null) {
// add tx to invalids
- invalid.add(writePointer);
+ invalidTxList.add(writePointer);
if (previous == null) {
LOG.debug("Invalidating tx {} in committing change sets but not in-progress", writePointer);
} else {
// invalidate any checkpoint write pointers
LongArrayList childWritePointers = previous.getCheckpointWritePointers();
if (!childWritePointers.isEmpty()) {
- invalid.addAll(childWritePointers);
+ invalidTxList.addAll(childWritePointers);
inProgress.keySet().removeAll(childWritePointers);
}
}
LOG.info("Tx invalid list: added tx {} because of invalidate", writePointer);
- // todo: find a more efficient way to keep this sorted. Could it just be an array?
- Collections.sort(invalid);
- invalidArray = invalid.toLongArray();
if (previous != null && !previous.isLongRunning()) {
// tx was short-running: must move read pointer
moveReadPointerIfNeeded(writePointer);
@@ -1080,13 +1074,9 @@ public class TransactionManager extends AbstractService {
}
}
- private boolean doTruncateInvalidTx(Set<Long> invalidTxIds) {
- LOG.info("Removing tx ids {} from invalid list", invalidTxIds);
- boolean success = invalid.removeAll(invalidTxIds);
- if (success) {
- invalidArray = invalid.toLongArray();
- }
- return success;
+ private boolean doTruncateInvalidTx(Set<Long> toRemove) {
+ LOG.info("Removing tx ids {} from invalid list", toRemove);
+ return invalidTxList.removeAll(toRemove);
}
/**
@@ -1123,15 +1113,16 @@ public class TransactionManager extends AbstractService {
}
// Find all invalid transactions earlier than truncateWp
- Set<Long> toTruncate = Sets.newHashSet();
- for (long wp : invalid) {
- // invalid list is sorted, hence can stop as soon as we reach a wp >= truncateWp
- if (wp >= truncateWp) {
- break;
+ LongSet toTruncate = new LongArraySet();
+ LongIterator it = invalidTxList.toRawList().iterator();
+ while (it.hasNext()) {
+ long wp = it.nextLong();
+ if (wp < truncateWp) {
+ toTruncate.add(wp);
}
- toTruncate.add(wp);
}
- return doTruncateInvalidTx(toTruncate);
+ LOG.info("Removing tx ids {} from invalid list", toTruncate);
+ return invalidTxList.removeAll(toTruncate);
}
public Transaction checkpoint(Transaction originalTx) throws TransactionNotInProgressException {
@@ -1149,7 +1140,7 @@ public class TransactionManager extends AbstractService {
// check that the parent tx is in progress
InProgressTx parentTx = inProgress.get(txId);
if (parentTx == null) {
- if (invalid.contains(txId)) {
+ if (invalidTxList.contains(txId)) {
throw new TransactionNotInProgressException(
String.format("Transaction %d is not in progress because it was invalidated", txId));
} else {
@@ -1184,14 +1175,14 @@ public class TransactionManager extends AbstractService {
// hack for exposing important metric
public int getExcludedListSize() {
- return invalid.size() + inProgress.size();
+ return getInvalidSize() + inProgress.size();
}
/**
* @return the size of invalid list
*/
- public int getInvalidSize() {
- return this.invalid.size();
+ public synchronized int getInvalidSize() {
+ return this.invalidTxList.size();
}
int getCommittedSize() {
@@ -1254,7 +1245,8 @@ public class TransactionManager extends AbstractService {
firstShortTx = txId;
}
}
- return new Transaction(readPointer, writePointer, invalidArray, inProgressIds.toLongArray(), firstShortTx, type);
+ return new Transaction(readPointer, writePointer, invalidTxList.toSortedArray(),
+ inProgressIds.toLongArray(), firstShortTx, type);
}
private void appendToLog(TransactionEdit edit) {
@@ -1285,7 +1277,7 @@ public class TransactionManager extends AbstractService {
*/
public void logStatistics() {
LOG.info("Transaction Statistics: write pointer = " + lastWritePointer +
- ", invalid = " + invalid.size() +
+ ", invalid = " + getInvalidSize() +
", in progress = " + inProgress.size() +
", committing = " + committingChangeSets.size() +
", committed = " + committedChangeSets.size());
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/872fb109/tephra-core/src/main/java/org/apache/tephra/manager/InvalidTxList.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/manager/InvalidTxList.java b/tephra-core/src/main/java/org/apache/tephra/manager/InvalidTxList.java
new file mode 100644
index 0000000..231196c
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/manager/InvalidTxList.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.manager;
+
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import it.unimi.dsi.fastutil.longs.LongList;
+import it.unimi.dsi.fastutil.longs.LongLists;
+import org.apache.tephra.TransactionManager;
+
+import java.util.Arrays;
+import java.util.Collection;
+import javax.annotation.concurrent.NotThreadSafe;
+
+/**
+ * This is an internal class used by the {@link TransactionManager} to store invalid transaction ids.
+ * This class uses both a list and an array to keep track of the invalid ids. The list is the primary
+ * data structure for storing the invalid ids. The array is populated lazily on changes to the list.
+ * The array is used to avoid creating a new array every time method {@link #toSortedArray()} is invoked.
+ *
+ * This class is not thread safe and relies on external synchronization. TransactionManager always
+ * accesses an instance of this class after synchronization.
+ */
+@NotThreadSafe
+public class InvalidTxList {
+ private static final long[] NO_INVALID_TX = { };
+
+ private final LongList invalid = new LongArrayList();
+ private long[] invalidArray = NO_INVALID_TX;
+
+ private boolean dirty = false; // used to track changes to the invalid list
+
+ public int size() {
+ return invalid.size();
+ }
+
+ public boolean isEmpty() {
+ return invalid.isEmpty();
+ }
+
+ public boolean add(long id) {
+ boolean changed = invalid.add(id);
+ dirty = dirty || changed;
+ return changed;
+ }
+
+ public boolean addAll(Collection<? extends Long> ids) {
+ boolean changed = invalid.addAll(ids);
+ dirty = dirty || changed;
+ return changed;
+ }
+
+ public boolean addAll(LongList ids) {
+ boolean changed = invalid.addAll(ids);
+ dirty = dirty || changed;
+ return changed;
+ }
+
+ public boolean contains(long id) {
+ return invalid.contains(id);
+ }
+
+ public boolean remove(long id) {
+ boolean changed = invalid.rem(id);
+ dirty = dirty || changed;
+ return changed;
+ }
+
+ public boolean removeAll(Collection<? extends Long> ids) {
+ boolean changed = invalid.removeAll(ids);
+ dirty = dirty || changed;
+ return changed;
+ }
+
+ @SuppressWarnings("WeakerAccess")
+ public boolean removeAll(LongList ids) {
+ boolean changed = invalid.removeAll(ids);
+ dirty = dirty || changed;
+ return changed;
+ }
+
+ public void clear() {
+ invalid.clear();
+ invalidArray = NO_INVALID_TX;
+ dirty = false;
+ }
+
+ /**
+ * @return sorted array of invalid transactions
+ */
+ public long[] toSortedArray() {
+ lazyUpdate();
+ return invalidArray;
+ }
+
+ /**
+ * @return list of invalid transactions. The list is not sorted.
+ */
+ public LongList toRawList() {
+ return LongLists.unmodifiable(invalid);
+ }
+
+ private void lazyUpdate() {
+ if (dirty) {
+ invalidArray = invalid.toLongArray();
+ Arrays.sort(invalidArray);
+ dirty = false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/872fb109/tephra-core/src/test/java/org/apache/tephra/manager/InvalidTxListTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/manager/InvalidTxListTest.java b/tephra-core/src/test/java/org/apache/tephra/manager/InvalidTxListTest.java
new file mode 100644
index 0000000..4f45072
--- /dev/null
+++ b/tephra-core/src/test/java/org/apache/tephra/manager/InvalidTxListTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.manager;
+
+import com.google.common.collect.ImmutableList;
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class InvalidTxListTest {
+
+ @Test
+ public void testInvalidTxList() {
+ InvalidTxList invalidTxList = new InvalidTxList();
+ // Assert that the list is empty at the beginning
+ Assert.assertTrue(invalidTxList.isEmpty());
+ Assert.assertEquals(0, invalidTxList.size());
+ Assert.assertEquals(ImmutableList.of(), invalidTxList.toRawList());
+ Assert.assertArrayEquals(new long[0], invalidTxList.toSortedArray());
+
+ // Try removing something from the empty list
+ Assert.assertFalse(invalidTxList.remove(3));
+ Assert.assertFalse(invalidTxList.removeAll(ImmutableList.of(5L, 9L)));
+
+ // verify contains
+ Assert.assertFalse(invalidTxList.contains(3));
+
+ // Add some elements to the list
+ invalidTxList.add(3);
+ invalidTxList.add(1);
+ invalidTxList.add(8);
+ invalidTxList.add(5);
+
+ // verify contains
+ Assert.assertTrue(invalidTxList.contains(3));
+
+ // Assert the newly added elements
+ Assert.assertFalse(invalidTxList.isEmpty());
+ Assert.assertEquals(4, invalidTxList.size());
+ Assert.assertEquals(ImmutableList.of(3L, 1L, 8L, 5L), invalidTxList.toRawList());
+ Assert.assertArrayEquals(new long[] {1, 3, 5, 8}, invalidTxList.toSortedArray());
+
+ // Add a collection of elements
+ invalidTxList.addAll(ImmutableList.of(7L, 10L, 4L, 2L));
+
+ // Assert the newly added elements
+ Assert.assertFalse(invalidTxList.isEmpty());
+ Assert.assertEquals(8, invalidTxList.size());
+ Assert.assertEquals(ImmutableList.of(3L, 1L, 8L, 5L, 7L, 10L, 4L, 2L), invalidTxList.toRawList());
+ Assert.assertArrayEquals(new long[] {1, 2, 3, 4, 5, 7, 8, 10}, invalidTxList.toSortedArray());
+
+ // Remove elements that are not present
+ Assert.assertFalse(invalidTxList.remove(6));
+ Assert.assertFalse(invalidTxList.removeAll(ImmutableList.of(9L, 11L)));
+
+ // Remove a collection of elements
+ Assert.assertTrue(invalidTxList.removeAll(ImmutableList.of(8L, 4L, 2L)));
+ // This time check the array first and then check the list
+ Assert.assertArrayEquals(new long[] {1, 3, 5, 7, 10}, invalidTxList.toSortedArray());
+ Assert.assertEquals(ImmutableList.of(3L, 1L, 5L, 7L, 10L), invalidTxList.toRawList());
+
+ // Remove a single element
+ Assert.assertTrue(invalidTxList.remove(5));
+ Assert.assertArrayEquals(new long[] {1, 3, 7, 10}, invalidTxList.toSortedArray());
+ Assert.assertEquals(ImmutableList.of(3L, 1L, 7L, 10L), invalidTxList.toRawList());
+
+ // Add a LongCollection
+ invalidTxList.addAll(new LongArrayList(new long[] {15, 12, 13}));
+
+ // Assert the newly added elements
+ Assert.assertEquals(7, invalidTxList.size());
+ Assert.assertArrayEquals(new long[] {1, 3, 7, 10, 12, 13, 15}, invalidTxList.toSortedArray());
+ Assert.assertEquals(ImmutableList.of(3L, 1L, 7L, 10L, 15L, 12L, 13L), invalidTxList.toRawList());
+
+ // Remove a LongCollection
+ invalidTxList.removeAll(new LongArrayList(new long[] {3, 7, 12}));
+
+ // Assert removals
+ Assert.assertEquals(4, invalidTxList.size());
+ Assert.assertArrayEquals(new long[] {1, 10, 13, 15}, invalidTxList.toSortedArray());
+ Assert.assertEquals(ImmutableList.of(1L, 10L, 15L, 13L), invalidTxList.toRawList());
+
+ // Clear the list
+ invalidTxList.clear();
+ Assert.assertTrue(invalidTxList.isEmpty());
+ Assert.assertEquals(0, invalidTxList.size());
+ Assert.assertArrayEquals(new long[0], invalidTxList.toSortedArray());
+ Assert.assertEquals(ImmutableList.of(), invalidTxList.toRawList());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/872fb109/tephra-core/src/test/java/org/apache/tephra/persist/AbstractTransactionStateStorageTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/persist/AbstractTransactionStateStorageTest.java b/tephra-core/src/test/java/org/apache/tephra/persist/AbstractTransactionStateStorageTest.java
index 21090c5..ec06528 100644
--- a/tephra-core/src/test/java/org/apache/tephra/persist/AbstractTransactionStateStorageTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/persist/AbstractTransactionStateStorageTest.java
@@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import com.google.common.primitives.Longs;
import it.unimi.dsi.fastutil.longs.LongArrayList;
import org.apache.hadoop.conf.Configuration;
import org.apache.tephra.ChangeId;
@@ -137,6 +138,9 @@ public abstract class AbstractTransactionStateStorageTest {
// TODO: replace with new persistence tests
final byte[] a = { 'a' };
final byte[] b = { 'b' };
+ // Start and invalidate a transaction
+ Transaction invalid = txManager.startShort();
+ txManager.invalidate(invalid.getTransactionId());
// start a tx1, add a change A and commit
Transaction tx1 = txManager.startShort();
Assert.assertTrue(txManager.canCommit(tx1, Collections.singleton(a)));
@@ -162,6 +166,12 @@ public abstract class AbstractTransactionStateStorageTest {
LOG.info("New state: " + newState);
assertEquals(origState, newState);
+ // Verify that the invalid transaction list matches
+ Transaction checkTx = txManager.startShort();
+ Assert.assertEquals(origState.getInvalid(), Longs.asList(checkTx.getInvalids()));
+ txManager.abort(checkTx);
+ txManager.abort(invalid);
+
// commit tx2
Assert.assertTrue(txManager.commit(tx2));
// start another transaction, must be greater than tx3