You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by an...@apache.org on 2017/09/12 23:23:50 UTC
[2/4] incubator-tephra git commit: (TEPHRA-240) Include conflicting
key and client id in TransactionConflictException
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/main/java/org/apache/tephra/inmemory/DetachedTxSystemClient.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/inmemory/DetachedTxSystemClient.java b/tephra-core/src/main/java/org/apache/tephra/inmemory/DetachedTxSystemClient.java
index dd17431..e33cd2c 100644
--- a/tephra-core/src/main/java/org/apache/tephra/inmemory/DetachedTxSystemClient.java
+++ b/tephra-core/src/main/java/org/apache/tephra/inmemory/DetachedTxSystemClient.java
@@ -33,9 +33,7 @@ import java.util.concurrent.atomic.AtomicLong;
/**
* Implementation of the tx system client that doesn't talk to any global service and tries to do its best to meet the
- * tx system requirements/expectations. In fact it implements enough logic to support running flows (when each flowlet
- * uses its own detached tx system client, without talking to each other and sharing any state) with "process exactly
- * once" guarantee if no failures happen.
+ * tx system requirements/expectations.
*
* NOTE: Will NOT detect conflicts. May leave inconsistent state when process crashes. Does NOT provide even read
* isolation guarantees.
@@ -43,7 +41,7 @@ import java.util.concurrent.atomic.AtomicLong;
* Good for performance testing. For demoing high throughput. For use-cases with relaxed tx guarantees.
*/
public class DetachedTxSystemClient implements TransactionSystemClient {
- // Dataset and queue logic relies on tx id to grow monotonically even after restart. Hence we need to start with
+ // client logic may rely on tx id to grow monotonically even after restart. Hence we need to start with
// value that is for sure bigger than the last one used before restart.
// NOTE: with code below we assume we don't do more than InMemoryTransactionManager.MAX_TX_PER_MS tx/ms
// by single client
@@ -88,8 +86,8 @@ public class DetachedTxSystemClient implements TransactionSystemClient {
}
@Override
- public boolean canCommitOrThrow(Transaction tx, Collection<byte[]> changeIds) {
- return true;
+ public void canCommitOrThrow(Transaction tx, Collection<byte[]> changeIds) {
+ // do nothing
}
@Override
@@ -98,6 +96,11 @@ public class DetachedTxSystemClient implements TransactionSystemClient {
}
@Override
+ public void commitOrThrow(Transaction tx) {
+ // do nothing
+ }
+
+ @Override
public void abort(Transaction tx) {
// do nothing
}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTxSystemClient.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTxSystemClient.java b/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTxSystemClient.java
index 9e57de8..54615fc 100644
--- a/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTxSystemClient.java
+++ b/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTxSystemClient.java
@@ -18,6 +18,7 @@
package org.apache.tephra.inmemory;
+import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Inject;
import org.apache.tephra.InvalidTruncateTimeException;
import org.apache.tephra.Transaction;
@@ -25,7 +26,6 @@ import org.apache.tephra.TransactionCouldNotTakeSnapshotException;
import org.apache.tephra.TransactionFailureException;
import org.apache.tephra.TransactionManager;
import org.apache.tephra.TransactionNotInProgressException;
-import org.apache.tephra.TransactionSizeException;
import org.apache.tephra.TransactionSystemClient;
import org.apache.tephra.TxConstants;
import org.slf4j.Logger;
@@ -45,6 +45,7 @@ public class InMemoryTxSystemClient implements TransactionSystemClient {
private static final Logger LOG = LoggerFactory.getLogger(InMemoryTxSystemClient.class);
+ @VisibleForTesting
TransactionManager txManager;
@Inject
@@ -70,20 +71,34 @@ public class InMemoryTxSystemClient implements TransactionSystemClient {
@Override
public boolean canCommit(Transaction tx, Collection<byte[]> changeIds) throws TransactionNotInProgressException {
try {
- return changeIds.isEmpty() || txManager.canCommit(tx, changeIds);
- } catch (TransactionSizeException e) {
+ canCommitOrThrow(tx, changeIds);
+ return true;
+ } catch (TransactionFailureException e) {
return false;
}
}
@Override
- public boolean canCommitOrThrow(Transaction tx, Collection<byte[]> changeIds) throws TransactionFailureException {
- return changeIds.isEmpty() || txManager.canCommit(tx, changeIds);
+ public void canCommitOrThrow(Transaction tx, Collection<byte[]> changeIds)
+ throws TransactionFailureException {
+ if (!changeIds.isEmpty()) {
+ txManager.canCommit(tx.getTransactionId(), changeIds);
+ }
}
@Override
public boolean commit(Transaction tx) throws TransactionNotInProgressException {
- return txManager.commit(tx);
+ try {
+ commitOrThrow(tx);
+ return true;
+ } catch (TransactionFailureException e) {
+ return false;
+ }
+ }
+
+ @Override
+ public void commitOrThrow(Transaction tx) throws TransactionFailureException {
+ txManager.commit(tx.getTransactionId(), tx.getWritePointer());
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/main/java/org/apache/tephra/inmemory/MinimalTxSystemClient.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/inmemory/MinimalTxSystemClient.java b/tephra-core/src/main/java/org/apache/tephra/inmemory/MinimalTxSystemClient.java
index de46f27..558c1ea 100644
--- a/tephra-core/src/main/java/org/apache/tephra/inmemory/MinimalTxSystemClient.java
+++ b/tephra-core/src/main/java/org/apache/tephra/inmemory/MinimalTxSystemClient.java
@@ -61,8 +61,8 @@ public class MinimalTxSystemClient implements TransactionSystemClient {
}
@Override
- public boolean canCommitOrThrow(Transaction tx, Collection<byte[]> changeIds) {
- return true;
+ public void canCommitOrThrow(Transaction tx, Collection<byte[]> changeIds) {
+ // do nothing
}
@Override
@@ -71,6 +71,11 @@ public class MinimalTxSystemClient implements TransactionSystemClient {
}
@Override
+ public void commitOrThrow(Transaction tx) {
+ // do nothing
+ }
+
+ @Override
public void abort(Transaction tx) {
// do nothing
}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/main/java/org/apache/tephra/persist/TransactionSnapshot.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/TransactionSnapshot.java b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionSnapshot.java
index d76a98f..c31d156 100644
--- a/tephra-core/src/main/java/org/apache/tephra/persist/TransactionSnapshot.java
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionSnapshot.java
@@ -197,8 +197,8 @@ public class TransactionSnapshot implements TransactionVisibilityState {
public static TransactionSnapshot copyFrom(long snapshotTime, long readPointer,
long writePointer, InvalidTxList invalidTxList,
NavigableMap<Long, TransactionManager.InProgressTx> inProgress,
- Map<Long, Set<ChangeId>> committing,
- NavigableMap<Long, Set<ChangeId>> committed) {
+ Map<Long, TransactionManager.ChangeSet> committing,
+ NavigableMap<Long, TransactionManager.ChangeSet> committed) {
// copy invalid IDs, after sorting
Collection<Long> invalidCopy = new LongArrayList(invalidTxList.toSortedArray());
// copy in-progress IDs and expirations
@@ -206,13 +206,13 @@ public class TransactionSnapshot implements TransactionVisibilityState {
// for committing and committed maps, we need to copy each individual Set as well to prevent modification
Map<Long, Set<ChangeId>> committingCopy = Maps.newHashMap();
- for (Map.Entry<Long, Set<ChangeId>> entry : committing.entrySet()) {
- committingCopy.put(entry.getKey(), new HashSet<>(entry.getValue()));
+ for (Map.Entry<Long, TransactionManager.ChangeSet> entry : committing.entrySet()) {
+ committingCopy.put(entry.getKey(), new HashSet<>(entry.getValue().getChangeIds()));
}
NavigableMap<Long, Set<ChangeId>> committedCopy = new TreeMap<>();
- for (Map.Entry<Long, Set<ChangeId>> entry : committed.entrySet()) {
- committedCopy.put(entry.getKey(), new HashSet<>(entry.getValue()));
+ for (Map.Entry<Long, TransactionManager.ChangeSet> entry : committed.entrySet()) {
+ committedCopy.put(entry.getKey(), new HashSet<>(entry.getValue().getChangeIds()));
}
return new TransactionSnapshot(snapshotTime, readPointer, writePointer,
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/main/thrift/transaction.thrift
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/thrift/transaction.thrift b/tephra-core/src/main/thrift/transaction.thrift
index 729e035..6cbfdad 100644
--- a/tephra-core/src/main/thrift/transaction.thrift
+++ b/tephra-core/src/main/thrift/transaction.thrift
@@ -43,6 +43,12 @@ struct TTransaction {
9: TVisibilityLevel visibilityLevel
}
+exception TTransactionConflictException {
+ 1: i64 transactionId,
+ 2: string conflictingKey,
+ 3: string conflictingClient
+}
+
exception TTransactionNotInProgressException {
1: string message
}
@@ -73,15 +79,21 @@ service TTransactionServer {
// TODO remove this as it was replaced with startShortWithTimeout in 0.10
TTransaction startShortTimeout(1: i32 timeout),
TTransaction startShortClientId(1: string clientId) throws (1: TGenericException e),
- TTransaction startShortWithClientIdAndTimeOut(1: string clientId, 2: i32 timeout) throws (1:TGenericException e),
- TTransaction startShortWithTimeout(1: i32 timeout) throws (1:TGenericException e),
- TBoolean canCommitTx(1: TTransaction tx, 2: set<binary> changes) throws (1:TTransactionNotInProgressException e),
- TBoolean canCommitOrThrow(1: TTransaction tx, 2: set<binary> changes) throws (1:TTransactionNotInProgressException e,
- 2:TGenericException g,),
+ TTransaction startShortWithClientIdAndTimeOut(1: string clientId, 2: i32 timeout) throws (1: TGenericException e),
+ TTransaction startShortWithTimeout(1: i32 timeout) throws (1: TGenericException e),
+ // TODO remove this as it was replaced with canCommitOrThrow in 0.13
+ TBoolean canCommitTx(1: TTransaction tx, 2: set<binary> changes) throws (1: TTransactionNotInProgressException e),
+ void canCommitOrThrow(1: i64 tx, 2: set<binary> changes) throws (1: TTransactionNotInProgressException e,
+ 2: TTransactionConflictException c,
+ 3: TGenericException g),
+ // TODO remove this as it was replaced with commitWithExn in 0.13
TBoolean commitTx(1: TTransaction tx) throws (1:TTransactionNotInProgressException e),
+ void commitOrThrow(1: i64 txId, 2: i64 wp) throws (1: TTransactionNotInProgressException e,
+ 2: TTransactionConflictException c,
+ 3: TGenericException g),
void abortTx(1: TTransaction tx),
- bool invalidateTx(1: i64 tx),
- binary getSnapshot() throws (1:TTransactionCouldNotTakeSnapshotException e),
+ bool invalidateTx(1: i64 txid),
+ binary getSnapshot() throws (1: TTransactionCouldNotTakeSnapshotException e),
void resetState(),
string status(),
TBoolean truncateInvalidTx(1: set<i64> txns),
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/test/java/org/apache/tephra/ClientIdTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/ClientIdTest.java b/tephra-core/src/test/java/org/apache/tephra/ClientIdTest.java
new file mode 100644
index 0000000..1b34255
--- /dev/null
+++ b/tephra-core/src/test/java/org/apache/tephra/ClientIdTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tephra.metrics.TxMetricsCollector;
+import org.apache.tephra.persist.InMemoryTransactionStateStorage;
+import org.apache.tephra.persist.TransactionStateStorage;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test the retention of client ids in Tx Manager
+ */
+@SuppressWarnings("WeakerAccess")
+public class ClientIdTest {
+
+ @Test
+ public void testClientIdRetention() throws TransactionFailureException {
+ testClientIdRetention(TransactionManager.ClientIdRetention.OFF, false, false);
+ testClientIdRetention(TransactionManager.ClientIdRetention.ACTIVE, true, false);
+ testClientIdRetention(TransactionManager.ClientIdRetention.COMMITTED, true, true);
+ }
+
+ private void testClientIdRetention(TransactionManager.ClientIdRetention retention,
+ boolean expectClientIdInProgress,
+ boolean expectClientIdCommitted) throws TransactionFailureException {
+ Configuration conf = new Configuration();
+ conf.set(TxConstants.Manager.CFG_TX_RETAIN_CLIENT_ID, retention.toString());
+ TransactionStateStorage txStateStorage = new InMemoryTransactionStateStorage();
+ TransactionManager txManager = new TransactionManager(conf, txStateStorage, new TxMetricsCollector());
+ txManager.startAndWait();
+ try {
+ testConflict(txManager, expectClientIdInProgress, expectClientIdCommitted);
+ } finally {
+ txManager.stopAndWait();
+ }
+ }
+
+ public void testConflict(TransactionManager txManager,
+ boolean expectClientIdInProgress,
+ boolean expectClientIdCommitted) throws TransactionFailureException {
+ testConflict(txManager, expectClientIdInProgress, expectClientIdCommitted, true);
+ testConflict(txManager, expectClientIdInProgress, expectClientIdCommitted, false);
+ }
+
+ /**
+ * Tests two conflicting transactions.
+ * The resulting exception must carry the conflicting change key and client id.
+ *
+ * @param expectClientIdInProgress whether to expect client id in in-progress transactions
+ * @param expectClientIdCommitted whether to expect client id in committed chaneg sets
+ * @param testCanCommit whether the conflict should be induced by canCommit() or by commit()
+ */
+ public void testConflict(TransactionManager txManager,
+ boolean expectClientIdInProgress,
+ boolean expectClientIdCommitted,
+ boolean testCanCommit) throws TransactionFailureException {
+ // start two transactions, validate client id
+ Transaction tx1 = txManager.startShort("clientA");
+ Transaction tx2 = txManager.startShort("clientB");
+ TransactionManager.InProgressTx inProgressTx1 = txManager.getInProgress(tx1.getTransactionId());
+ Assert.assertNotNull(inProgressTx1);
+ if (expectClientIdInProgress) {
+ Assert.assertEquals("clientA", inProgressTx1.getClientId());
+ } else {
+ Assert.assertNull(inProgressTx1.getClientId());
+ }
+
+ // now commit the two transactions with overlapping change sets to create a conflict
+ final byte[] change1 = new byte[] { '1' };
+ final byte[] change2 = new byte[] { '2' };
+ final byte[] change3 = new byte[] { '3' };
+ if (!testCanCommit) {
+ txManager.canCommit(tx2.getTransactionId(), ImmutableList.of(change2, change3));
+ }
+ txManager.canCommit(tx1.getTransactionId(), ImmutableList.of(change1, change2));
+ txManager.commit(tx1.getTransactionId(), tx1.getWritePointer());
+ try {
+ if (testCanCommit) {
+ txManager.canCommit(tx2.getTransactionId(), ImmutableList.of(change2, change3));
+ } else {
+ txManager.commit(tx2.getTransactionId(), tx2.getWritePointer());
+ }
+ Assert.fail("canCommit() should have failed with conflict");
+ } catch (TransactionConflictException e) {
+ Assert.assertNotNull(e.getTransactionId());
+ Assert.assertEquals(tx2.getTransactionId(), e.getTransactionId().longValue());
+ Assert.assertEquals("2", e.getConflictingKey());
+ if (expectClientIdCommitted) {
+ Assert.assertEquals("clientA", e.getConflictingClient());
+ } else {
+ Assert.assertNull(e.getConflictingClient());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/test/java/org/apache/tephra/DummyTxAware.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/DummyTxAware.java b/tephra-core/src/test/java/org/apache/tephra/DummyTxAware.java
new file mode 100644
index 0000000..54e8a8c
--- /dev/null
+++ b/tephra-core/src/test/java/org/apache/tephra/DummyTxAware.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+import java.util.Collection;
+import java.util.List;
+
+class DummyTxAware implements TransactionAware {
+
+ enum InduceFailure { NoFailure, ReturnFalse, ThrowException }
+
+ boolean started = false;
+ boolean committed = false;
+ boolean checked = false;
+ boolean rolledBack = false;
+ boolean postCommitted = false;
+ private List<byte[]> changes = Lists.newArrayList();
+
+ InduceFailure failStartTxOnce = InduceFailure.NoFailure;
+ InduceFailure failChangesTxOnce = InduceFailure.NoFailure;
+ InduceFailure failCommitTxOnce = InduceFailure.NoFailure;
+ InduceFailure failPostCommitTxOnce = InduceFailure.NoFailure;
+ InduceFailure failRollbackTxOnce = InduceFailure.NoFailure;
+
+ void addChange(byte[] key) {
+ changes.add(key);
+ }
+
+ void reset() {
+ started = false;
+ checked = false;
+ committed = false;
+ rolledBack = false;
+ postCommitted = false;
+ changes.clear();
+ }
+
+ @Override
+ public void startTx(Transaction tx) {
+ reset();
+ started = true;
+ if (failStartTxOnce == InduceFailure.ThrowException) {
+ failStartTxOnce = InduceFailure.NoFailure;
+ throw new RuntimeException("start failure");
+ }
+ }
+
+ @Override
+ public void updateTx(Transaction tx) {
+ // do nothing
+ }
+
+ @Override
+ public Collection<byte[]> getTxChanges() {
+ checked = true;
+ if (failChangesTxOnce == InduceFailure.ThrowException) {
+ failChangesTxOnce = InduceFailure.NoFailure;
+ throw new RuntimeException("changes failure");
+ }
+ return ImmutableList.copyOf(changes);
+ }
+
+ @Override
+ public boolean commitTx() throws Exception {
+ committed = true;
+ if (failCommitTxOnce == InduceFailure.ThrowException) {
+ failCommitTxOnce = InduceFailure.NoFailure;
+ throw new RuntimeException("persist failure");
+ }
+ if (failCommitTxOnce == InduceFailure.ReturnFalse) {
+ failCommitTxOnce = InduceFailure.NoFailure;
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public void postTxCommit() {
+ postCommitted = true;
+ if (failPostCommitTxOnce == InduceFailure.ThrowException) {
+ failPostCommitTxOnce = InduceFailure.NoFailure;
+ throw new RuntimeException("post failure");
+ }
+ }
+
+ @Override
+ public boolean rollbackTx() throws Exception {
+ rolledBack = true;
+ if (failRollbackTxOnce == InduceFailure.ThrowException) {
+ failRollbackTxOnce = InduceFailure.NoFailure;
+ throw new RuntimeException("rollback failure");
+ }
+ if (failRollbackTxOnce == InduceFailure.ReturnFalse) {
+ failRollbackTxOnce = InduceFailure.NoFailure;
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public String getTransactionAwareName() {
+ return "dummy";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/test/java/org/apache/tephra/DummyTxClient.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/DummyTxClient.java b/tephra-core/src/test/java/org/apache/tephra/DummyTxClient.java
new file mode 100644
index 0000000..0321b17
--- /dev/null
+++ b/tephra-core/src/test/java/org/apache/tephra/DummyTxClient.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra;
+
+import com.google.inject.Inject;
+import org.apache.tephra.inmemory.InMemoryTxSystemClient;
+
+import java.util.Collection;
+
+class DummyTxClient extends InMemoryTxSystemClient {
+
+ boolean failCanCommitOnce = false;
+ int failCommits = 0;
+ enum CommitState {
+ Started, Committed, Aborted, Invalidated
+ }
+ CommitState state = CommitState.Started;
+
+ @Inject
+ DummyTxClient(TransactionManager txmgr) {
+ super(txmgr);
+ }
+
+ @Override
+ public void canCommitOrThrow(Transaction tx, Collection<byte[]> changeIds)
+ throws TransactionFailureException {
+ if (failCanCommitOnce) {
+ failCanCommitOnce = false;
+ throw new TransactionConflictException(tx.getTransactionId(), "<unknown>", null);
+ } else {
+ super.canCommitOrThrow(tx, changeIds);
+ }
+ }
+
+ @Override
+ public void commitOrThrow(Transaction tx)
+ throws TransactionFailureException {
+ if (failCommits-- > 0) {
+ throw new TransactionConflictException(tx.getTransactionId(), "<unknown>", null);
+ } else {
+ state = CommitState.Committed;
+ super.commitOrThrow(tx);
+ }
+ }
+
+ @Override
+ public Transaction startLong() {
+ state = CommitState.Started;
+ return super.startLong();
+ }
+
+ @Override
+ public Transaction startShort() {
+ state = CommitState.Started;
+ return super.startShort();
+ }
+
+ @Override
+ public Transaction startShort(int timeout) {
+ state = CommitState.Started;
+ return super.startShort(timeout);
+ }
+
+ @Override
+ public void abort(Transaction tx) {
+ state = CommitState.Aborted;
+ super.abort(tx);
+ }
+
+ @Override
+ public boolean invalidate(long tx) {
+ state = CommitState.Invalidated;
+ return super.invalidate(tx);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java b/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java
index 5f4675b..fcf793e 100644
--- a/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java
@@ -18,16 +18,12 @@
package org.apache.tephra;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
-import com.google.inject.Inject;
import com.google.inject.Injector;
import com.google.inject.Singleton;
import com.google.inject.util.Modules;
import org.apache.hadoop.conf.Configuration;
-import org.apache.tephra.inmemory.InMemoryTxSystemClient;
import org.apache.tephra.runtime.ConfigModule;
import org.apache.tephra.runtime.DiscoveryModules;
import org.apache.tephra.runtime.TransactionModules;
@@ -40,8 +36,6 @@ import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
/**
* Tests the transaction executor.
@@ -74,10 +68,10 @@ public class TransactionContextTest {
txClient = (DummyTxClient) injector.getInstance(TransactionSystemClient.class);
}
- final DummyTxAware ds1 = new DummyTxAware(), ds2 = new DummyTxAware();
+ private final DummyTxAware ds1 = new DummyTxAware(), ds2 = new DummyTxAware();
- static final byte[] A = { 'a' };
- static final byte[] B = { 'b' };
+ private static final byte[] A = { 'a' };
+ private static final byte[] B = { 'b' };
private static TransactionContext newTransactionContext(TransactionAware... txAwares) {
return new TransactionContext(txClient, txAwares);
@@ -115,7 +109,7 @@ public class TransactionContextTest {
@Test
public void testPostCommitFailure() throws TransactionFailureException, InterruptedException {
- ds1.failPostCommitTxOnce = InduceFailure.ThrowException;
+ ds1.failPostCommitTxOnce = DummyTxAware.InduceFailure.ThrowException;
TransactionContext context = newTransactionContext(ds1, ds2);
// start transaction
context.start();
@@ -145,7 +139,7 @@ public class TransactionContextTest {
@Test
public void testPersistFailure() throws TransactionFailureException, InterruptedException {
- ds1.failCommitTxOnce = InduceFailure.ThrowException;
+ ds1.failCommitTxOnce = DummyTxAware.InduceFailure.ThrowException;
TransactionContext context = newTransactionContext(ds1, ds2);
// start transaction
context.start();
@@ -175,7 +169,7 @@ public class TransactionContextTest {
@Test
public void testPersistFalse() throws TransactionFailureException, InterruptedException {
- ds1.failCommitTxOnce = InduceFailure.ReturnFalse;
+ ds1.failCommitTxOnce = DummyTxAware.InduceFailure.ReturnFalse;
TransactionContext context = newTransactionContext(ds1, ds2);
// start transaction
context.start();
@@ -205,8 +199,8 @@ public class TransactionContextTest {
@Test
public void testPersistAndRollbackFailure() throws TransactionFailureException, InterruptedException {
- ds1.failCommitTxOnce = InduceFailure.ThrowException;
- ds1.failRollbackTxOnce = InduceFailure.ThrowException;
+ ds1.failCommitTxOnce = DummyTxAware.InduceFailure.ThrowException;
+ ds1.failRollbackTxOnce = DummyTxAware.InduceFailure.ThrowException;
TransactionContext context = newTransactionContext(ds1, ds2);
// start transaction
context.start();
@@ -236,8 +230,8 @@ public class TransactionContextTest {
@Test
public void testPersistAndRollbackFalse() throws TransactionFailureException, InterruptedException {
- ds1.failCommitTxOnce = InduceFailure.ReturnFalse;
- ds1.failRollbackTxOnce = InduceFailure.ReturnFalse;
+ ds1.failCommitTxOnce = DummyTxAware.InduceFailure.ReturnFalse;
+ ds1.failRollbackTxOnce = DummyTxAware.InduceFailure.ReturnFalse;
TransactionContext context = newTransactionContext(ds1, ds2);
// start transaction
context.start();
@@ -327,8 +321,8 @@ public class TransactionContextTest {
@Test
public void testChangesAndRollbackFailure() throws TransactionFailureException, InterruptedException {
- ds1.failChangesTxOnce = InduceFailure.ThrowException;
- ds1.failRollbackTxOnce = InduceFailure.ThrowException;
+ ds1.failChangesTxOnce = DummyTxAware.InduceFailure.ThrowException;
+ ds1.failRollbackTxOnce = DummyTxAware.InduceFailure.ThrowException;
TransactionContext context = newTransactionContext(ds1, ds2);
// start transaction
context.start();
@@ -358,7 +352,7 @@ public class TransactionContextTest {
@Test
public void testStartAndRollbackFailure() throws TransactionFailureException, InterruptedException {
- ds1.failStartTxOnce = InduceFailure.ThrowException;
+ ds1.failStartTxOnce = DummyTxAware.InduceFailure.ThrowException;
TransactionContext context = newTransactionContext(ds1, ds2);
// start transaction
try {
@@ -410,7 +404,7 @@ public class TransactionContextTest {
@Test
public void testAddThenFailure() throws TransactionFailureException, InterruptedException {
- ds2.failCommitTxOnce = InduceFailure.ThrowException;
+ ds2.failCommitTxOnce = DummyTxAware.InduceFailure.ThrowException;
TransactionContext context = newTransactionContext(ds1);
// start transaction
@@ -482,7 +476,7 @@ public class TransactionContextTest {
@Test
public void testAndThenRemoveOnFailure() throws TransactionFailureException {
- ds1.failCommitTxOnce = InduceFailure.ThrowException;
+ ds1.failCommitTxOnce = DummyTxAware.InduceFailure.ThrowException;
TransactionContext context = newTransactionContext();
context.start();
@@ -507,175 +501,4 @@ public class TransactionContextTest {
Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted);
}
-
- enum InduceFailure { NoFailure, ReturnFalse, ThrowException }
-
- static class DummyTxAware implements TransactionAware {
-
- Transaction tx;
- boolean started = false;
- boolean committed = false;
- boolean checked = false;
- boolean rolledBack = false;
- boolean postCommitted = false;
- List<byte[]> changes = Lists.newArrayList();
-
- InduceFailure failStartTxOnce = InduceFailure.NoFailure;
- InduceFailure failChangesTxOnce = InduceFailure.NoFailure;
- InduceFailure failCommitTxOnce = InduceFailure.NoFailure;
- InduceFailure failPostCommitTxOnce = InduceFailure.NoFailure;
- InduceFailure failRollbackTxOnce = InduceFailure.NoFailure;
-
- void addChange(byte[] key) {
- changes.add(key);
- }
-
- void reset() {
- tx = null;
- started = false;
- checked = false;
- committed = false;
- rolledBack = false;
- postCommitted = false;
- changes.clear();
- }
-
- @Override
- public void startTx(Transaction tx) {
- reset();
- started = true;
- this.tx = tx;
- if (failStartTxOnce == InduceFailure.ThrowException) {
- failStartTxOnce = InduceFailure.NoFailure;
- throw new RuntimeException("start failure");
- }
- }
-
- @Override
- public void updateTx(Transaction tx) {
- this.tx = tx;
- }
-
- @Override
- public Collection<byte[]> getTxChanges() {
- checked = true;
- if (failChangesTxOnce == InduceFailure.ThrowException) {
- failChangesTxOnce = InduceFailure.NoFailure;
- throw new RuntimeException("changes failure");
- }
- return ImmutableList.copyOf(changes);
- }
-
- @Override
- public boolean commitTx() throws Exception {
- committed = true;
- if (failCommitTxOnce == InduceFailure.ThrowException) {
- failCommitTxOnce = InduceFailure.NoFailure;
- throw new RuntimeException("persist failure");
- }
- if (failCommitTxOnce == InduceFailure.ReturnFalse) {
- failCommitTxOnce = InduceFailure.NoFailure;
- return false;
- }
- return true;
- }
-
- @Override
- public void postTxCommit() {
- postCommitted = true;
- if (failPostCommitTxOnce == InduceFailure.ThrowException) {
- failPostCommitTxOnce = InduceFailure.NoFailure;
- throw new RuntimeException("post failure");
- }
- }
-
- @Override
- public boolean rollbackTx() throws Exception {
- rolledBack = true;
- if (failRollbackTxOnce == InduceFailure.ThrowException) {
- failRollbackTxOnce = InduceFailure.NoFailure;
- throw new RuntimeException("rollback failure");
- }
- if (failRollbackTxOnce == InduceFailure.ReturnFalse) {
- failRollbackTxOnce = InduceFailure.NoFailure;
- return false;
- }
- return true;
- }
-
- @Override
- public String getTransactionAwareName() {
- return "dummy";
- }
- }
-
- static class DummyTxClient extends InMemoryTxSystemClient {
-
- boolean failCanCommitOnce = false;
- int failCommits = 0;
- enum CommitState {
- Started, Committed, Aborted, Invalidated
- }
- CommitState state = CommitState.Started;
-
- @Inject
- DummyTxClient(TransactionManager txmgr) {
- super(txmgr);
- }
-
- @Override
- public boolean canCommit(Transaction tx, Collection<byte[]> changeIds) throws TransactionNotInProgressException {
- if (failCanCommitOnce) {
- failCanCommitOnce = false;
- return false;
- } else {
- return super.canCommit(tx, changeIds);
- }
- }
-
- @Override
- public boolean canCommitOrThrow(Transaction tx, Collection<byte[]> changeIds) throws TransactionFailureException {
- return canCommit(tx, changeIds);
- }
-
- @Override
- public boolean commit(Transaction tx) throws TransactionNotInProgressException {
- if (failCommits-- > 0) {
- return false;
- } else {
- state = CommitState.Committed;
- return super.commit(tx);
- }
- }
-
- @Override
- public Transaction startLong() {
- state = CommitState.Started;
- return super.startLong();
- }
-
- @Override
- public Transaction startShort() {
- state = CommitState.Started;
- return super.startShort();
- }
-
- @Override
- public Transaction startShort(int timeout) {
- state = CommitState.Started;
- return super.startShort(timeout);
- }
-
- @Override
- public void abort(Transaction tx) {
- state = CommitState.Aborted;
- super.abort(tx);
- }
-
- @Override
- public boolean invalidate(long tx) {
- state = CommitState.Invalidated;
- return super.invalidate(tx);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/test/java/org/apache/tephra/TransactionExecutorTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/TransactionExecutorTest.java b/tephra-core/src/test/java/org/apache/tephra/TransactionExecutorTest.java
index 676774c..506ffd9 100644
--- a/tephra-core/src/test/java/org/apache/tephra/TransactionExecutorTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/TransactionExecutorTest.java
@@ -19,19 +19,16 @@
package org.apache.tephra;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
-import com.google.inject.Inject;
import com.google.inject.Injector;
import com.google.inject.Singleton;
import com.google.inject.util.Modules;
import org.apache.hadoop.conf.Configuration;
-import org.apache.tephra.inmemory.InMemoryTxSystemClient;
import org.apache.tephra.runtime.ConfigModule;
import org.apache.tephra.runtime.DiscoveryModules;
import org.apache.tephra.runtime.TransactionModules;
-import org.apache.tephra.snapshot.DefaultSnapshotCodec;
+import org.apache.tephra.snapshot.SnapshotCodecV4;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -41,7 +38,6 @@ import org.junit.rules.TemporaryFolder;
import java.io.IOException;
import java.util.Collection;
-import java.util.List;
import javax.annotation.Nullable;
/**
@@ -57,7 +53,7 @@ public class TransactionExecutorTest {
@BeforeClass
public static void setup() throws IOException {
final Configuration conf = new Configuration();
- conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, DefaultSnapshotCodec.class.getName());
+ conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, SnapshotCodecV4.class.getName());
conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath());
Injector injector = Guice.createInjector(
new ConfigModule(conf),
@@ -77,8 +73,8 @@ public class TransactionExecutorTest {
factory = injector.getInstance(TransactionExecutorFactory.class);
}
- final DummyTxAware ds1 = new DummyTxAware(), ds2 = new DummyTxAware();
- final Collection<TransactionAware> txAwares = ImmutableList.<TransactionAware>of(ds1, ds2);
+ private final DummyTxAware ds1 = new DummyTxAware(), ds2 = new DummyTxAware();
+ private final Collection<TransactionAware> txAwares = ImmutableList.<TransactionAware>of(ds1, ds2);
private TransactionExecutor getExecutor() {
return factory.createExecutor(txAwares);
@@ -88,10 +84,10 @@ public class TransactionExecutorTest {
return new DefaultTransactionExecutor(txClient, txAwares, RetryStrategies.noRetries());
}
- static final byte[] A = { 'a' };
- static final byte[] B = { 'b' };
+ private static final byte[] A = { 'a' };
+ private static final byte[] B = { 'b' };
- final TransactionExecutor.Function<Integer, Integer> testFunction =
+ private final TransactionExecutor.Function<Integer, Integer> testFunction =
new TransactionExecutor.Function<Integer, Integer>() {
@Override
public Integer apply(@Nullable Integer input) {
@@ -131,7 +127,7 @@ public class TransactionExecutorTest {
@Test
public void testPostCommitFailure() throws TransactionFailureException, InterruptedException {
- ds1.failPostCommitTxOnce = InduceFailure.ThrowException;
+ ds1.failPostCommitTxOnce = DummyTxAware.InduceFailure.ThrowException;
// execute: add a change to ds1 and ds2
try {
getExecutor().execute(testFunction, 10);
@@ -155,7 +151,7 @@ public class TransactionExecutorTest {
@Test
public void testPersistFailure() throws TransactionFailureException, InterruptedException {
- ds1.failCommitTxOnce = InduceFailure.ThrowException;
+ ds1.failCommitTxOnce = DummyTxAware.InduceFailure.ThrowException;
// execute: add a change to ds1 and ds2
try {
getExecutor().execute(testFunction, 10);
@@ -179,7 +175,7 @@ public class TransactionExecutorTest {
@Test
public void testPersistFalse() throws TransactionFailureException, InterruptedException {
- ds1.failCommitTxOnce = InduceFailure.ReturnFalse;
+ ds1.failCommitTxOnce = DummyTxAware.InduceFailure.ReturnFalse;
// execute: add a change to ds1 and ds2
try {
getExecutor().execute(testFunction, 10);
@@ -203,8 +199,8 @@ public class TransactionExecutorTest {
@Test
public void testPersistAndRollbackFailure() throws TransactionFailureException, InterruptedException {
- ds1.failCommitTxOnce = InduceFailure.ThrowException;
- ds1.failRollbackTxOnce = InduceFailure.ThrowException;
+ ds1.failCommitTxOnce = DummyTxAware.InduceFailure.ThrowException;
+ ds1.failRollbackTxOnce = DummyTxAware.InduceFailure.ThrowException;
// execute: add a change to ds1 and ds2
try {
getExecutor().execute(testFunction, 10);
@@ -228,8 +224,8 @@ public class TransactionExecutorTest {
@Test
public void testPersistAndRollbackFalse() throws TransactionFailureException, InterruptedException {
- ds1.failCommitTxOnce = InduceFailure.ReturnFalse;
- ds1.failRollbackTxOnce = InduceFailure.ReturnFalse;
+ ds1.failCommitTxOnce = DummyTxAware.InduceFailure.ReturnFalse;
+ ds1.failRollbackTxOnce = DummyTxAware.InduceFailure.ReturnFalse;
// execute: add a change to ds1 and ds2
try {
getExecutor().execute(testFunction, 10);
@@ -351,8 +347,8 @@ public class TransactionExecutorTest {
@Test
public void testChangesAndRollbackFailure() throws TransactionFailureException, InterruptedException {
- ds1.failChangesTxOnce = InduceFailure.ThrowException;
- ds1.failRollbackTxOnce = InduceFailure.ThrowException;
+ ds1.failChangesTxOnce = DummyTxAware.InduceFailure.ThrowException;
+ ds1.failRollbackTxOnce = DummyTxAware.InduceFailure.ThrowException;
// execute: add a change to ds1 and ds2
try {
getExecutor().execute(testFunction, 10);
@@ -376,7 +372,7 @@ public class TransactionExecutorTest {
@Test
public void testFunctionAndRollbackFailure() throws TransactionFailureException, InterruptedException {
- ds1.failRollbackTxOnce = InduceFailure.ReturnFalse;
+ ds1.failRollbackTxOnce = DummyTxAware.InduceFailure.ReturnFalse;
// execute: add a change to ds1 and ds2
try {
getExecutor().execute(testFunction, null);
@@ -400,7 +396,7 @@ public class TransactionExecutorTest {
@Test
public void testStartAndRollbackFailure() throws TransactionFailureException, InterruptedException {
- ds1.failStartTxOnce = InduceFailure.ThrowException;
+ ds1.failStartTxOnce = DummyTxAware.InduceFailure.ThrowException;
// execute: add a change to ds1 and ds2
try {
getExecutor().execute(testFunction, 10);
@@ -421,175 +417,4 @@ public class TransactionExecutorTest {
Assert.assertFalse(ds2.rolledBack);
Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted);
}
-
- enum InduceFailure { NoFailure, ReturnFalse, ThrowException }
-
- static class DummyTxAware implements TransactionAware {
-
- Transaction tx;
- boolean started = false;
- boolean committed = false;
- boolean checked = false;
- boolean rolledBack = false;
- boolean postCommitted = false;
- List<byte[]> changes = Lists.newArrayList();
-
- InduceFailure failStartTxOnce = InduceFailure.NoFailure;
- InduceFailure failChangesTxOnce = InduceFailure.NoFailure;
- InduceFailure failCommitTxOnce = InduceFailure.NoFailure;
- InduceFailure failPostCommitTxOnce = InduceFailure.NoFailure;
- InduceFailure failRollbackTxOnce = InduceFailure.NoFailure;
-
- void addChange(byte[] key) {
- changes.add(key);
- }
-
- void reset() {
- tx = null;
- started = false;
- checked = false;
- committed = false;
- rolledBack = false;
- postCommitted = false;
- changes.clear();
- }
-
- @Override
- public void startTx(Transaction tx) {
- reset();
- started = true;
- this.tx = tx;
- if (failStartTxOnce == InduceFailure.ThrowException) {
- failStartTxOnce = InduceFailure.NoFailure;
- throw new RuntimeException("start failure");
- }
- }
-
- @Override
- public void updateTx(Transaction tx) {
- this.tx = tx;
- }
-
- @Override
- public Collection<byte[]> getTxChanges() {
- checked = true;
- if (failChangesTxOnce == InduceFailure.ThrowException) {
- failChangesTxOnce = InduceFailure.NoFailure;
- throw new RuntimeException("changes failure");
- }
- return ImmutableList.copyOf(changes);
- }
-
- @Override
- public boolean commitTx() throws Exception {
- committed = true;
- if (failCommitTxOnce == InduceFailure.ThrowException) {
- failCommitTxOnce = InduceFailure.NoFailure;
- throw new RuntimeException("persist failure");
- }
- if (failCommitTxOnce == InduceFailure.ReturnFalse) {
- failCommitTxOnce = InduceFailure.NoFailure;
- return false;
- }
- return true;
- }
-
- @Override
- public void postTxCommit() {
- postCommitted = true;
- if (failPostCommitTxOnce == InduceFailure.ThrowException) {
- failPostCommitTxOnce = InduceFailure.NoFailure;
- throw new RuntimeException("post failure");
- }
- }
-
- @Override
- public boolean rollbackTx() throws Exception {
- rolledBack = true;
- if (failRollbackTxOnce == InduceFailure.ThrowException) {
- failRollbackTxOnce = InduceFailure.NoFailure;
- throw new RuntimeException("rollback failure");
- }
- if (failRollbackTxOnce == InduceFailure.ReturnFalse) {
- failRollbackTxOnce = InduceFailure.NoFailure;
- return false;
- }
- return true;
- }
-
- @Override
- public String getTransactionAwareName() {
- return "dummy";
- }
- }
-
- static class DummyTxClient extends InMemoryTxSystemClient {
-
- boolean failCanCommitOnce = false;
- int failCommits = 0;
- enum CommitState {
- Started, Committed, Aborted, Invalidated
- }
- CommitState state = CommitState.Started;
-
- @Inject
- DummyTxClient(TransactionManager txmgr) {
- super(txmgr);
- }
-
- @Override
- public boolean canCommit(Transaction tx, Collection<byte[]> changeIds) throws TransactionNotInProgressException {
- if (failCanCommitOnce) {
- failCanCommitOnce = false;
- return false;
- } else {
- return super.canCommit(tx, changeIds);
- }
- }
-
- @Override
- public boolean canCommitOrThrow(Transaction tx, Collection<byte[]> changeIds) throws TransactionFailureException {
- return canCommit(tx, changeIds);
- }
-
- @Override
- public boolean commit(Transaction tx) throws TransactionNotInProgressException {
- if (failCommits-- > 0) {
- return false;
- } else {
- state = CommitState.Committed;
- return super.commit(tx);
- }
- }
-
- @Override
- public Transaction startLong() {
- state = CommitState.Started;
- return super.startLong();
- }
-
- @Override
- public Transaction startShort() {
- state = CommitState.Started;
- return super.startShort();
- }
-
- @Override
- public Transaction startShort(int timeout) {
- state = CommitState.Started;
- return super.startShort(timeout);
- }
-
- @Override
- public void abort(Transaction tx) {
- state = CommitState.Aborted;
- super.abort(tx);
- }
-
- @Override
- public boolean invalidate(long tx) {
- state = CommitState.Invalidated;
- return super.invalidate(tx);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/test/java/org/apache/tephra/TransactionManagerTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/TransactionManagerTest.java b/tephra-core/src/test/java/org/apache/tephra/TransactionManagerTest.java
index 819a981..b16d93d 100644
--- a/tephra-core/src/test/java/org/apache/tephra/TransactionManagerTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/TransactionManagerTest.java
@@ -31,12 +31,14 @@ import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
+import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
/**
*
*/
+@SuppressWarnings("WeakerAccess")
public class TransactionManagerTest extends TransactionSystemTest {
private static Configuration conf;
@@ -76,6 +78,92 @@ public class TransactionManagerTest extends TransactionSystemTest {
}
@Test
+ public void testCheckpointing() throws TransactionFailureException {
+ // create a few transactions
+ Transaction tx1 = txManager.startShort();
+ Transaction tx2 = txManager.startShort();
+ Transaction tx3 = txManager.startShort();
+
+ // start and commit a few
+ for (int i = 0; i < 5; i++) {
+ Transaction tx = txManager.startShort();
+ txManager.canCommit(tx.getTransactionId(), Collections.singleton(new byte[] { (byte) i }));
+ txManager.commit(tx.getTransactionId(), tx.getWritePointer());
+ }
+
+ // checkpoint the transactions
+ Transaction tx3c = txManager.checkpoint(tx3);
+ Transaction tx2c = txManager.checkpoint(tx2);
+ Transaction tx1c = txManager.checkpoint(tx1);
+
+ // start and commit a few (this moves the read pointer past all checkpoint write versions)
+ for (int i = 5; i < 10; i++) {
+ Transaction tx = txManager.startShort();
+ txManager.canCommit(tx.getTransactionId(), Collections.singleton(new byte[] { (byte) i }));
+ txManager.commit(tx.getTransactionId(), tx.getWritePointer());
+ }
+
+ // start new tx and validate all write pointers are excluded
+ Transaction tx = txManager.startShort();
+ validateSorted(tx.getInProgress());
+ validateSorted(tx.getInvalids());
+ Assert.assertFalse(tx.isVisible(tx1.getWritePointer()));
+ Assert.assertFalse(tx.isVisible(tx2.getWritePointer()));
+ Assert.assertFalse(tx.isVisible(tx3.getWritePointer()));
+ Assert.assertFalse(tx.isVisible(tx1c.getWritePointer()));
+ Assert.assertFalse(tx.isVisible(tx2c.getWritePointer()));
+ Assert.assertFalse(tx.isVisible(tx3c.getWritePointer()));
+ txManager.abort(tx);
+
+ // abort one of the checkpoints
+ txManager.abort(tx1c);
+
+ // start new tx and validate all write pointers are excluded
+ tx = txManager.startShort();
+ validateSorted(tx.getInProgress());
+ validateSorted(tx.getInvalids());
+ Assert.assertFalse(tx.isVisible(tx2.getWritePointer()));
+ Assert.assertFalse(tx.isVisible(tx3.getWritePointer()));
+ Assert.assertFalse(tx.isVisible(tx2c.getWritePointer()));
+ Assert.assertFalse(tx.isVisible(tx3c.getWritePointer()));
+ txManager.abort(tx);
+
+ // invalidate one of the checkpoints
+ txManager.invalidate(tx2c.getTransactionId());
+
+ // start new tx and validate all write pointers are excluded
+ tx = txManager.startShort();
+ validateSorted(tx.getInProgress());
+ validateSorted(tx.getInvalids());
+ Assert.assertFalse(tx.isVisible(tx2.getWritePointer()));
+ Assert.assertFalse(tx.isVisible(tx3.getWritePointer()));
+ Assert.assertFalse(tx.isVisible(tx2c.getWritePointer()));
+ Assert.assertFalse(tx.isVisible(tx3c.getWritePointer()));
+ txManager.abort(tx);
+
+ // commit the last checkpoint
+ txManager.canCommit(tx3.getTransactionId(), Collections.<byte[]>emptyList());
+ txManager.commit(tx3c.getTransactionId(), tx3c.getWritePointer());
+
+ // start new tx and validate all write pointers are excluded
+ tx = txManager.startShort();
+ validateSorted(tx.getInProgress());
+ validateSorted(tx.getInvalids());
+ Assert.assertFalse(tx.isVisible(tx2.getWritePointer()));
+ Assert.assertFalse(tx.isVisible(tx2c.getWritePointer()));
+ txManager.abort(tx);
+ }
+
+ private void validateSorted(long[] array) {
+ Long lastSeen = null;
+ for (long value : array) {
+ Assert.assertTrue(String.format("%s is not sorted", Arrays.toString(array)),
+ lastSeen == null || lastSeen < value);
+ lastSeen = value;
+ }
+ }
+
+ @Test
public void testTransactionCleanup() throws Exception {
Configuration config = new Configuration(conf);
config.setInt(TxConstants.Manager.CFG_TX_CLEANUP_INTERVAL, 3);
@@ -98,8 +186,8 @@ public class TransactionManagerTest extends TransactionSystemTest {
// start and commit a bunch of transactions
for (int i = 0; i < 10; i++) {
Transaction tx = txm.startShort();
- Assert.assertTrue(txm.canCommit(tx, Collections.singleton(new byte[] { (byte) i })));
- Assert.assertTrue(txm.commit(tx));
+ txm.canCommit(tx.getTransactionId(), Collections.singleton(new byte[] { (byte) i }));
+ txm.commit(tx.getTransactionId(), tx.getWritePointer());
}
// all of these should still be in the committed set
Assert.assertEquals(0, txm.getInvalidSize());
@@ -124,14 +212,14 @@ public class TransactionManagerTest extends TransactionSystemTest {
tx2.getWritePointer(),
tx2c.getWritePointer()}, txx.getInvalids());
// try to commit the last transaction that was started
- Assert.assertTrue(txm.canCommit(txx, Collections.singleton(new byte[] { 0x0a })));
- Assert.assertTrue(txm.commit(txx));
+ txm.canCommit(txx.getTransactionId(), Collections.singleton(new byte[] { 0x0a }));
+ txm.commit(txx.getTransactionId(), txx.getWritePointer());
// now the committed change sets should be empty again
Assert.assertEquals(0, txm.getCommittedSize());
// cannot commit transaction as it was timed out
try {
- txm.canCommit(tx1, Collections.singleton(new byte[] { 0x11 }));
+ txm.canCommit(tx1.getTransactionId(), Collections.singleton(new byte[] { 0x11 }));
Assert.fail();
} catch (TransactionNotInProgressException e) {
// expected
@@ -148,14 +236,14 @@ public class TransactionManagerTest extends TransactionSystemTest {
// run another bunch of transactions
for (int i = 0; i < 10; i++) {
Transaction tx = txm.startShort();
- Assert.assertTrue(txm.canCommit(tx, Collections.singleton(new byte[] { (byte) i })));
- Assert.assertTrue(txm.commit(tx));
+ txm.canCommit(tx.getTransactionId(), Collections.singleton(new byte[] { (byte) i }));
+ txm.commit(tx.getTransactionId(), tx.getWritePointer());
}
// none of these should still be in the committed set (tx2 is long-running).
Assert.assertEquals(0, txm.getInvalidSize());
Assert.assertEquals(0, txm.getCommittedSize());
// commit tx2, abort tx3
- Assert.assertTrue(txm.commit(ltx1));
+ txm.commit(ltx1.getTransactionId(), ltx1.getWritePointer());
txm.abort(ltx2);
// none of these should still be in the committed set (tx2 is long-running).
// Only tx3 is invalid list as it was aborted and is long-running. tx1 is short one and it rolled back its changes
@@ -196,7 +284,7 @@ public class TransactionManagerTest extends TransactionSystemTest {
// cannot commit transaction as it was timed out
try {
- txm.canCommit(tx1, Collections.singleton(new byte[] { 0x11 }));
+ txm.canCommit(tx1.getTransactionId(), Collections.singleton(new byte[] { 0x11 }));
Assert.fail();
} catch (TransactionNotInProgressException e) {
// expected
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/test/java/org/apache/tephra/TransactionSystemTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/TransactionSystemTest.java b/tephra-core/src/test/java/org/apache/tephra/TransactionSystemTest.java
index 5448052..6cbda2f 100644
--- a/tephra-core/src/test/java/org/apache/tephra/TransactionSystemTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/TransactionSystemTest.java
@@ -90,7 +90,7 @@ public abstract class TransactionSystemTest {
}
Transaction tx = client.startShort();
client.canCommitOrThrow(tx, fiftyChanges);
- client.commit(tx);
+ client.commitOrThrow(tx);
// now try another transaction with 51 changes
fiftyChanges.add(new byte[] { 50 });
@@ -112,7 +112,7 @@ public abstract class TransactionSystemTest {
}
tx = client.startShort();
client.canCommitOrThrow(tx, changes2k);
- client.commit(tx);
+ client.commitOrThrow(tx);
// now add another byte to the change set to exceed the limit
changes2k.add(new byte[] { 0 });
@@ -134,14 +134,14 @@ public abstract class TransactionSystemTest {
Transaction tx1 = client1.startShort();
Transaction tx2 = client2.startShort();
- Assert.assertTrue(client1.canCommitOrThrow(tx1, asList(C1, C2)));
+ client1.canCommitOrThrow(tx1, asList(C1, C2));
// second one also can commit even thought there are conflicts with first since first one hasn't committed yet
- Assert.assertTrue(client2.canCommitOrThrow(tx2, asList(C2, C3)));
+ client2.canCommitOrThrow(tx2, asList(C2, C3));
- Assert.assertTrue(client1.commit(tx1));
+ client1.commitOrThrow(tx1);
// now second one should not commit, since there are conflicts with tx1 that has been committed
- Assert.assertFalse(client2.commit(tx2));
+ assertCommitConflicts(client2, tx2);
}
@Test
@@ -161,16 +161,16 @@ public abstract class TransactionSystemTest {
Transaction tx4 = client4.startShort();
Transaction tx5 = client5.startShort();
- Assert.assertTrue(client1.canCommitOrThrow(tx1, asList(C1)));
- Assert.assertTrue(client1.commit(tx1));
+ client1.canCommitOrThrow(tx1, asList(C1));
+ client1.commitOrThrow(tx1);
- Assert.assertTrue(client2.canCommitOrThrow(tx2, asList(C2)));
- Assert.assertTrue(client2.commit(tx2));
+ client2.canCommitOrThrow(tx2, asList(C2));
+ client2.commitOrThrow(tx2);
// verifying conflicts detection
- Assert.assertFalse(client3.canCommitOrThrow(tx3, asList(C1)));
- Assert.assertFalse(client4.canCommitOrThrow(tx4, asList(C2)));
- Assert.assertTrue(client5.canCommitOrThrow(tx5, asList(C3)));
+ assertCanCommitConflicts(client3, tx3, asList(C1));
+ assertCanCommitConflicts(client4, tx4, asList(C2));
+ client5.canCommitOrThrow(tx5, asList(C3));
}
@Test
@@ -178,15 +178,10 @@ public abstract class TransactionSystemTest {
TransactionSystemClient client = getClient();
Transaction tx = client.startShort();
- Assert.assertTrue(client.canCommitOrThrow(tx, asList(C1, C2)));
- Assert.assertTrue(client.commit(tx));
+ client.canCommitOrThrow(tx, asList(C1, C2));
+ client.commitOrThrow(tx);
// cannot commit twice same tx
- try {
- Assert.assertFalse(client.commit(tx));
- Assert.fail();
- } catch (TransactionNotInProgressException e) {
- // expected
- }
+ assertCommitNotInProgress(client, tx);
}
@Test
@@ -194,7 +189,7 @@ public abstract class TransactionSystemTest {
TransactionSystemClient client = getClient();
Transaction tx = client.startShort();
- Assert.assertTrue(client.canCommitOrThrow(tx, asList(C1, C2)));
+ client.canCommitOrThrow(tx, asList(C1, C2));
client.abort(tx);
// abort of not active tx has no affect
client.abort(tx);
@@ -205,21 +200,12 @@ public abstract class TransactionSystemTest {
TransactionSystemClient client = getClient();
Transaction tx = client.startShort();
- Assert.assertTrue(client.canCommitOrThrow(tx, asList(C1, C2)));
- Assert.assertTrue(client.commit(tx));
+ client.canCommitOrThrow(tx, asList(C1, C2));
+ client.commitOrThrow(tx);
+
// can't re-use same tx again
- try {
- client.canCommitOrThrow(tx, asList(C3, C4));
- Assert.fail();
- } catch (TransactionNotInProgressException e) {
- // expected
- }
- try {
- Assert.assertFalse(client.commit(tx));
- Assert.fail();
- } catch (TransactionNotInProgressException e) {
- // expected
- }
+ assertCanCommitNotInProgress(client, tx, asList(C3, C4));
+ assertCommitNotInProgress(client, tx);
// abort of not active tx has no affect
client.abort(tx);
@@ -229,24 +215,15 @@ public abstract class TransactionSystemTest {
public void testUseNotStarted() throws Exception {
TransactionSystemClient client = getClient();
Transaction tx1 = client.startShort();
- Assert.assertTrue(client.commit(tx1));
+ client.commitOrThrow(tx1);
// we know this is one is older than current writePointer and was not used
Transaction txOld = new Transaction(tx1.getReadPointer(), tx1.getTransactionId() - 1,
new long[] {}, new long[] {}, Transaction.NO_TX_IN_PROGRESS,
TransactionType.SHORT);
- try {
- Assert.assertFalse(client.canCommitOrThrow(txOld, asList(C3, C4)));
- Assert.fail();
- } catch (TransactionNotInProgressException e) {
- // expected
- }
- try {
- Assert.assertFalse(client.commit(txOld));
- Assert.fail();
- } catch (TransactionNotInProgressException e) {
- // expected
- }
+ assertCanCommitNotInProgress(client, txOld, asList(C3, C4));
+ assertCommitNotInProgress(client, txOld);
+
// abort of not active tx has no affect
client.abort(txOld);
@@ -254,18 +231,9 @@ public abstract class TransactionSystemTest {
Transaction txNew = new Transaction(tx1.getReadPointer(), tx1.getTransactionId() + 1,
new long[] {}, new long[] {}, Transaction.NO_TX_IN_PROGRESS,
TransactionType.SHORT);
- try {
- Assert.assertFalse(client.canCommitOrThrow(txNew, asList(C3, C4)));
- Assert.fail();
- } catch (TransactionNotInProgressException e) {
- // expected
- }
- try {
- Assert.assertFalse(client.commit(txNew));
- Assert.fail();
- } catch (TransactionNotInProgressException e) {
- // expected
- }
+ assertCanCommitNotInProgress(client, txNew, asList(C3, C4));
+ assertCommitNotInProgress(client, txNew);
+
// abort of not active tx has no affect
client.abort(txNew);
}
@@ -275,8 +243,9 @@ public abstract class TransactionSystemTest {
TransactionSystemClient client = getClient();
Transaction tx = client.startShort();
- Assert.assertTrue(client.canCommitOrThrow(tx, asList(C1, C2)));
- Assert.assertTrue(client.commit(tx));
+ client.canCommitOrThrow(tx, asList(C1, C2));
+ client.commitOrThrow(tx);
+
// abort of not active tx has no affect
client.abort(tx);
}
@@ -292,7 +261,7 @@ public abstract class TransactionSystemTest {
// Cannot invalidate a committed tx
Transaction tx2 = client.startShort();
client.canCommitOrThrow(tx2, asList(C3, C4));
- client.commit(tx2);
+ client.commitOrThrow(tx2);
Assert.assertFalse(client.invalidate(tx2.getTransactionId()));
}
@@ -306,7 +275,7 @@ public abstract class TransactionSystemTest {
Transaction tx1 = client.startShort();
Transaction tx2 = client.startShort();
client.canCommitOrThrow(tx1, asList(C1, C2));
- client.commit(tx1);
+ client.commitOrThrow(tx1);
client.canCommitOrThrow(tx2, asList(C3, C4));
Transaction txPreReset = client.startShort();
@@ -409,8 +378,8 @@ public abstract class TransactionSystemTest {
// start and commit a few
for (int i = 0; i < 5; i++) {
Transaction tx = client.startShort();
- Assert.assertTrue(client.canCommit(tx, Collections.singleton(new byte[] { (byte) i })));
- Assert.assertTrue(client.commit(tx));
+ client.canCommitOrThrow(tx, Collections.singleton(new byte[] { (byte) i }));
+ client.commitOrThrow(tx);
}
// checkpoint the transactions
@@ -421,8 +390,8 @@ public abstract class TransactionSystemTest {
// start and commit a few (this moves the read pointer past all checkpoint write versions)
for (int i = 5; i < 10; i++) {
Transaction tx = client.startShort();
- Assert.assertTrue(client.canCommit(tx, Collections.singleton(new byte[] { (byte) i })));
- Assert.assertTrue(client.commit(tx));
+ client.canCommitOrThrow(tx, Collections.singleton(new byte[] { (byte) i }));
+ client.commitOrThrow(tx);
}
// start new tx and validate all write pointers are excluded
@@ -464,8 +433,8 @@ public abstract class TransactionSystemTest {
client.abort(tx);
// commit the last checkpoint
- Assert.assertTrue(client.canCommit(tx3, Collections.<byte[]>emptyList()));
- Assert.assertTrue(client.commit(tx3c));
+ client.canCommitOrThrow(tx3, Collections.<byte[]>emptyList());
+ client.commitOrThrow(tx3c);
// start new tx and validate all write pointers are excluded
tx = client.startShort();
@@ -485,6 +454,46 @@ public abstract class TransactionSystemTest {
}
}
+ private void assertCommitConflicts(TransactionSystemClient client, Transaction tx)
+ throws TransactionFailureException {
+ try {
+ client.commitOrThrow(tx);
+ Assert.fail();
+ } catch (TransactionConflictException e) {
+ //expected
+ }
+ }
+
+ private void assertCanCommitConflicts(TransactionSystemClient client, Transaction tx, Collection<byte[]> changes)
+ throws TransactionFailureException {
+ try {
+ client.canCommitOrThrow(tx, changes);
+ Assert.fail();
+ } catch (TransactionConflictException e) {
+ //expected
+ }
+ }
+
+ private void assertCommitNotInProgress(TransactionSystemClient client, Transaction tx)
+ throws TransactionFailureException {
+ try {
+ client.commitOrThrow(tx);
+ Assert.fail();
+ } catch (TransactionNotInProgressException e) {
+ //expected
+ }
+ }
+
+ private void assertCanCommitNotInProgress(TransactionSystemClient client, Transaction tx, Collection<byte[]> changes)
+ throws TransactionFailureException {
+ try {
+ client.canCommitOrThrow(tx, changes);
+ Assert.fail();
+ } catch (TransactionNotInProgressException e) {
+ //expected
+ }
+ }
+
private Collection<byte[]> asList(byte[]... val) {
return Arrays.asList(val);
}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/test/java/org/apache/tephra/distributed/ThriftTransactionServerTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/distributed/ThriftTransactionServerTest.java b/tephra-core/src/test/java/org/apache/tephra/distributed/ThriftTransactionServerTest.java
index 6075452..bd48f7a 100644
--- a/tephra-core/src/test/java/org/apache/tephra/distributed/ThriftTransactionServerTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/distributed/ThriftTransactionServerTest.java
@@ -197,7 +197,7 @@ public class ThriftTransactionServerTest {
// simply start + commit transaction
TransactionSystemClient txClient = getClient();
Transaction tx = txClient.startShort();
- txClient.commit(tx);
+ txClient.commitOrThrow(tx);
// Expire zookeeper session, which causes Thrift server to stop running.
expireZkSession(zkClientService);
@@ -215,7 +215,7 @@ public class ThriftTransactionServerTest {
txClient = getClient();
// verify that we can start and commit a transaction after becoming leader again
tx = txClient.startShort();
- txClient.commit(tx);
+ txClient.commitOrThrow(tx);
}
private void expireZkSession(ZKClientService zkClientService) throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/test/java/org/apache/tephra/persist/AbstractTransactionStateStorageTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/persist/AbstractTransactionStateStorageTest.java b/tephra-core/src/test/java/org/apache/tephra/persist/AbstractTransactionStateStorageTest.java
index 971c93c..961d368 100644
--- a/tephra-core/src/test/java/org/apache/tephra/persist/AbstractTransactionStateStorageTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/persist/AbstractTransactionStateStorageTest.java
@@ -28,6 +28,7 @@ import it.unimi.dsi.fastutil.longs.LongArrayList;
import org.apache.hadoop.conf.Configuration;
import org.apache.tephra.ChangeId;
import org.apache.tephra.Transaction;
+import org.apache.tephra.TransactionConflictException;
import org.apache.tephra.TransactionManager;
import org.apache.tephra.TransactionType;
import org.apache.tephra.TxConstants;
@@ -142,11 +143,11 @@ public abstract class AbstractTransactionStateStorageTest {
txManager.invalidate(invalid.getTransactionId());
// start a tx1, add a change A and commit
Transaction tx1 = txManager.startShort("client1");
- Assert.assertTrue(txManager.canCommit(tx1, Collections.singleton(a)));
- Assert.assertTrue(txManager.commit(tx1));
+ txManager.canCommit(tx1.getTransactionId(), Collections.singleton(a));
+ txManager.commit(tx1.getTransactionId(), tx1.getWritePointer());
// start a tx2 and add a change B
Transaction tx2 = txManager.startShort("client2");
- Assert.assertTrue(txManager.canCommit(tx2, Collections.singleton(b)));
+ txManager.canCommit(tx2.getTransactionId(), Collections.singleton(b));
// start a tx3
Transaction tx3 = txManager.startShort("client3");
// restart
@@ -172,7 +173,7 @@ public abstract class AbstractTransactionStateStorageTest {
txManager.abort(invalid);
// commit tx2
- Assert.assertTrue(txManager.commit(tx2));
+ txManager.commit(tx2.getTransactionId(), tx2.getWritePointer());
// start another transaction, must be greater than tx3
Transaction tx4 = txManager.startShort();
Assert.assertTrue(tx4.getTransactionId() > tx3.getTransactionId());
@@ -181,7 +182,12 @@ public abstract class AbstractTransactionStateStorageTest {
Assert.assertFalse(tx2.isVisible(tx3.getTransactionId()));
Assert.assertFalse(tx2.isVisible(tx4.getTransactionId()));
// add same change for tx3
- Assert.assertFalse(txManager.canCommit(tx3, Collections.singleton(b)));
+ try {
+ txManager.canCommit(tx3.getTransactionId(), Collections.singleton(b));
+ Assert.fail("canCommit() should have failed");
+ } catch (TransactionConflictException e) {
+ // expected
+ }
// check visibility with new xaction
Transaction tx5 = txManager.startShort();
Assert.assertTrue(tx5.isVisible(tx1.getTransactionId()));
@@ -252,11 +258,11 @@ public abstract class AbstractTransactionStateStorageTest {
final byte[] b = { 'b' };
// start a tx1, add a change A and commit
Transaction tx1 = txManager.startShort();
- Assert.assertTrue(txManager.canCommit(tx1, Collections.singleton(a)));
- Assert.assertTrue(txManager.commit(tx1));
+ txManager.canCommit(tx1.getTransactionId(), Collections.singleton(a));
+ txManager.commit(tx1.getTransactionId(), tx1.getWritePointer());
// start a tx2 and add a change B
Transaction tx2 = txManager.startShort();
- Assert.assertTrue(txManager.canCommit(tx2, Collections.singleton(b)));
+ txManager.canCommit(tx2.getTransactionId(), Collections.singleton(b));
// start a tx3
Transaction tx3 = txManager.startShort();
TransactionSnapshot origState = txManager.getCurrentState();
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java b/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java
index 9c565ba..98d1148 100644
--- a/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java
@@ -364,8 +364,8 @@ public class SnapshotCodecTest {
Assert.assertTrue(inProgressTx.getCheckpointWritePointers().isEmpty());
// Should be able to commit the transaction
- Assert.assertTrue(txManager.canCommit(checkpointTx, Collections.<byte[]>emptyList()));
- Assert.assertTrue(txManager.commit(checkpointTx));
+ txManager.canCommit(checkpointTx.getTransactionId(), Collections.<byte[]>emptyList());
+ txManager.commit(checkpointTx.getTransactionId(), checkpointTx.getWritePointer());
// save a new snapshot
txManager.stopAndWait();
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
index 3c7d1e2..9615d8e 100644
--- a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
+++ b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
@@ -58,7 +58,6 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.tephra.ChangeId;
import org.apache.tephra.Transaction;
import org.apache.tephra.TransactionManager;
import org.apache.tephra.TxConstants;
@@ -89,7 +88,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
@@ -151,7 +149,7 @@ public class TransactionProcessorTest {
// this will set visibility upper bound to V[6]
Maps.newTreeMap(ImmutableSortedMap.of(V[6], new TransactionManager.InProgressTx(
V[6] - 1, Long.MAX_VALUE, TransactionManager.InProgressType.SHORT))),
- new HashMap<Long, Set<ChangeId>>(), new TreeMap<Long, Set<ChangeId>>());
+ new HashMap<Long, TransactionManager.ChangeSet>(), new TreeMap<Long, TransactionManager.ChangeSet>());
txVisibilityState = new TransactionSnapshot(txSnapshot.getTimestamp(), txSnapshot.getReadPointer(),
txSnapshot.getWritePointer(), txSnapshot.getInvalid(),
txSnapshot.getInProgress());
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java
index d826bad..4d34ed9 100644
--- a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java
+++ b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java
@@ -37,7 +37,6 @@ import java.util.Map;
import javax.annotation.Nullable;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
/**
* HBase 0.96 specific test for filtering logic applied when reading data transactionally.
@@ -250,20 +249,20 @@ public class TransactionVisibilityFilterTest extends AbstractTransactionVisibili
*/
Transaction tx1 = txManager.startShort();
- assertTrue(txManager.canCommit(tx1, EMPTY_CHANGESET));
- assertTrue(txManager.commit(tx1));
+ txManager.canCommit(tx1.getTransactionId(), EMPTY_CHANGESET);
+ txManager.commit(tx1.getTransactionId(), tx1.getWritePointer());
Transaction tx2 = txManager.startShort();
- assertTrue(txManager.canCommit(tx2, EMPTY_CHANGESET));
- assertTrue(txManager.commit(tx2));
+ txManager.canCommit(tx2.getTransactionId(), EMPTY_CHANGESET);
+ txManager.commit(tx2.getTransactionId(), tx2.getWritePointer());
Transaction tx3 = txManager.startShort();
Transaction tx4 = txManager.startShort();
txManager.invalidate(tx4.getTransactionId());
Transaction tx5 = txManager.startShort();
- assertTrue(txManager.canCommit(tx5, EMPTY_CHANGESET));
- assertTrue(txManager.commit(tx5));
+ txManager.canCommit(tx5.getTransactionId(), EMPTY_CHANGESET);
+ txManager.commit(tx5.getTransactionId(), tx5.getWritePointer());
Transaction tx6 = txManager.startShort();
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
index b8e051b..dcb8314 100644
--- a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
+++ b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
@@ -64,7 +64,6 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.tephra.ChangeId;
import org.apache.tephra.Transaction;
import org.apache.tephra.TransactionManager;
import org.apache.tephra.TxConstants;
@@ -95,7 +94,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
@@ -156,7 +154,7 @@ public class TransactionProcessorTest {
// this will set visibility upper bound to V[6]
Maps.newTreeMap(ImmutableSortedMap.of(V[6], new TransactionManager.InProgressTx(
V[6] - 1, Long.MAX_VALUE, TransactionManager.InProgressType.SHORT))),
- new HashMap<Long, Set<ChangeId>>(), new TreeMap<Long, Set<ChangeId>>());
+ new HashMap<Long, TransactionManager.ChangeSet>(), new TreeMap<Long, TransactionManager.ChangeSet>());
txVisibilityState = new TransactionSnapshot(txSnapshot.getTimestamp(), txSnapshot.getReadPointer(),
txSnapshot.getWritePointer(), txSnapshot.getInvalid(),
txSnapshot.getInProgress());
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java
index 7a57aac..3352eef 100644
--- a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java
+++ b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java
@@ -37,7 +37,6 @@ import java.util.Map;
import javax.annotation.Nullable;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
/**
* HBase 0.98 specific test for filtering logic applied when reading data transactionally.
@@ -249,20 +248,20 @@ public class TransactionVisibilityFilterTest extends AbstractTransactionVisibili
*/
Transaction tx1 = txManager.startShort();
- assertTrue(txManager.canCommit(tx1, EMPTY_CHANGESET));
- assertTrue(txManager.commit(tx1));
+ txManager.canCommit(tx1.getTransactionId(), EMPTY_CHANGESET);
+ txManager.commit(tx1.getTransactionId(), tx1.getWritePointer());
Transaction tx2 = txManager.startShort();
- assertTrue(txManager.canCommit(tx2, EMPTY_CHANGESET));
- assertTrue(txManager.commit(tx2));
+ txManager.canCommit(tx2.getTransactionId(), EMPTY_CHANGESET);
+ txManager.commit(tx2.getTransactionId(), tx2.getWritePointer());
Transaction tx3 = txManager.startShort();
Transaction tx4 = txManager.startShort();
txManager.invalidate(tx4.getTransactionId());
Transaction tx5 = txManager.startShort();
- assertTrue(txManager.canCommit(tx5, EMPTY_CHANGESET));
- assertTrue(txManager.commit(tx5));
+ txManager.canCommit(tx5.getTransactionId(), EMPTY_CHANGESET);
+ txManager.commit(tx5.getTransactionId(), tx5.getWritePointer());
Transaction tx6 = txManager.startShort();
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
index 9ce30b5..016adbd 100644
--- a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
+++ b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
@@ -47,7 +47,6 @@ import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.tephra.ChangeId;
import org.apache.tephra.Transaction;
import org.apache.tephra.TransactionManager;
import org.apache.tephra.TxConstants;
@@ -77,7 +76,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
@@ -136,7 +134,7 @@ public class TransactionProcessorTest {
// this will set visibility upper bound to V[6]
Maps.newTreeMap(ImmutableSortedMap.of(V[6], new TransactionManager.InProgressTx(
V[6] - 1, Long.MAX_VALUE, TransactionManager.InProgressType.SHORT))),
- new HashMap<Long, Set<ChangeId>>(), new TreeMap<Long, Set<ChangeId>>());
+ new HashMap<Long, TransactionManager.ChangeSet>(), new TreeMap<Long, TransactionManager.ChangeSet>());
txVisibilityState = new TransactionSnapshot(txSnapshot.getTimestamp(), txSnapshot.getReadPointer(),
txSnapshot.getWritePointer(), txSnapshot.getInvalid(),
txSnapshot.getInProgress());
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java
index 5b9802d..c27a10d 100644
--- a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java
+++ b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java
@@ -37,7 +37,6 @@ import java.util.Map;
import javax.annotation.Nullable;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
/**
* HBase 1.0 (CDH) specific test for filtering logic applied when reading data transactionally.
@@ -249,20 +248,20 @@ public class TransactionVisibilityFilterTest extends AbstractTransactionVisibili
*/
Transaction tx1 = txManager.startShort();
- assertTrue(txManager.canCommit(tx1, EMPTY_CHANGESET));
- assertTrue(txManager.commit(tx1));
+ txManager.canCommit(tx1.getTransactionId(), EMPTY_CHANGESET);
+ txManager.commit(tx1.getTransactionId(), tx1.getWritePointer());
Transaction tx2 = txManager.startShort();
- assertTrue(txManager.canCommit(tx2, EMPTY_CHANGESET));
- assertTrue(txManager.commit(tx2));
+ txManager.canCommit(tx2.getTransactionId(), EMPTY_CHANGESET);
+ txManager.commit(tx2.getTransactionId(), tx2.getWritePointer());
Transaction tx3 = txManager.startShort();
Transaction tx4 = txManager.startShort();
txManager.invalidate(tx4.getTransactionId());
Transaction tx5 = txManager.startShort();
- assertTrue(txManager.canCommit(tx5, EMPTY_CHANGESET));
- assertTrue(txManager.commit(tx5));
+ txManager.canCommit(tx5.getTransactionId(), EMPTY_CHANGESET);
+ txManager.commit(tx5.getTransactionId(), tx5.getWritePointer());
Transaction tx6 = txManager.startShort();
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
index 0ec3b46..5328aef 100644
--- a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
+++ b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
@@ -47,7 +47,6 @@ import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.tephra.ChangeId;
import org.apache.tephra.Transaction;
import org.apache.tephra.TransactionManager;
import org.apache.tephra.TxConstants;
@@ -77,7 +76,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
@@ -136,7 +134,7 @@ public class TransactionProcessorTest {
// this will set visibility upper bound to V[6]
Maps.newTreeMap(ImmutableSortedMap.of(V[6], new TransactionManager.InProgressTx(
V[6] - 1, Long.MAX_VALUE, TransactionManager.InProgressType.SHORT))),
- new HashMap<Long, Set<ChangeId>>(), new TreeMap<Long, Set<ChangeId>>());
+ new HashMap<Long, TransactionManager.ChangeSet>(), new TreeMap<Long, TransactionManager.ChangeSet>());
txVisibilityState = new TransactionSnapshot(txSnapshot.getTimestamp(), txSnapshot.getReadPointer(),
txSnapshot.getWritePointer(), txSnapshot.getInvalid(),
txSnapshot.getInProgress());