You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by an...@apache.org on 2017/09/12 23:23:50 UTC

[2/4] incubator-tephra git commit: (TEPHRA-240) Include conflicting key and client id in TransactionConflictException

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/main/java/org/apache/tephra/inmemory/DetachedTxSystemClient.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/inmemory/DetachedTxSystemClient.java b/tephra-core/src/main/java/org/apache/tephra/inmemory/DetachedTxSystemClient.java
index dd17431..e33cd2c 100644
--- a/tephra-core/src/main/java/org/apache/tephra/inmemory/DetachedTxSystemClient.java
+++ b/tephra-core/src/main/java/org/apache/tephra/inmemory/DetachedTxSystemClient.java
@@ -33,9 +33,7 @@ import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Implementation of the tx system client that doesn't talk to any global service and tries to do its best to meet the
- * tx system requirements/expectations. In fact it implements enough logic to support running flows (when each flowlet
- * uses its own detached tx system client, without talking to each other and sharing any state) with "process exactly
- * once" guarantee if no failures happen.
+ * tx system requirements/expectations.
  *
  * NOTE: Will NOT detect conflicts. May leave inconsistent state when process crashes. Does NOT provide even read
  *       isolation guarantees.
@@ -43,7 +41,7 @@ import java.util.concurrent.atomic.AtomicLong;
  * Good for performance testing. For demoing high throughput. For use-cases with relaxed tx guarantees.
  */
 public class DetachedTxSystemClient implements TransactionSystemClient {
-  // Dataset and queue logic relies on tx id to grow monotonically even after restart. Hence we need to start with
+  // client logic may rely on tx id to grow monotonically even after restart. Hence we need to start with
   // value that is for sure bigger than the last one used before restart.
   // NOTE: with code below we assume we don't do more than InMemoryTransactionManager.MAX_TX_PER_MS tx/ms
   //       by single client
@@ -88,8 +86,8 @@ public class DetachedTxSystemClient implements TransactionSystemClient {
   }
 
   @Override
-  public boolean canCommitOrThrow(Transaction tx, Collection<byte[]> changeIds) {
-    return true;
+  public void canCommitOrThrow(Transaction tx, Collection<byte[]> changeIds) {
+    // do nothing
   }
 
   @Override
@@ -98,6 +96,11 @@ public class DetachedTxSystemClient implements TransactionSystemClient {
   }
 
   @Override
+  public void commitOrThrow(Transaction tx) {
+    // do nothing
+  }
+
+  @Override
   public void abort(Transaction tx) {
     // do nothing
   }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTxSystemClient.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTxSystemClient.java b/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTxSystemClient.java
index 9e57de8..54615fc 100644
--- a/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTxSystemClient.java
+++ b/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTxSystemClient.java
@@ -18,6 +18,7 @@
 
 package org.apache.tephra.inmemory;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.inject.Inject;
 import org.apache.tephra.InvalidTruncateTimeException;
 import org.apache.tephra.Transaction;
@@ -25,7 +26,6 @@ import org.apache.tephra.TransactionCouldNotTakeSnapshotException;
 import org.apache.tephra.TransactionFailureException;
 import org.apache.tephra.TransactionManager;
 import org.apache.tephra.TransactionNotInProgressException;
-import org.apache.tephra.TransactionSizeException;
 import org.apache.tephra.TransactionSystemClient;
 import org.apache.tephra.TxConstants;
 import org.slf4j.Logger;
@@ -45,6 +45,7 @@ public class InMemoryTxSystemClient implements TransactionSystemClient {
 
   private static final Logger LOG = LoggerFactory.getLogger(InMemoryTxSystemClient.class);
 
+  @VisibleForTesting
   TransactionManager txManager;
 
   @Inject
@@ -70,20 +71,34 @@ public class InMemoryTxSystemClient implements TransactionSystemClient {
   @Override
   public boolean canCommit(Transaction tx, Collection<byte[]> changeIds) throws TransactionNotInProgressException {
     try {
-      return changeIds.isEmpty() || txManager.canCommit(tx, changeIds);
-    } catch (TransactionSizeException e) {
+      canCommitOrThrow(tx, changeIds);
+      return true;
+    } catch (TransactionFailureException e) {
       return false;
     }
   }
 
   @Override
-  public boolean canCommitOrThrow(Transaction tx, Collection<byte[]> changeIds) throws TransactionFailureException {
-    return changeIds.isEmpty() || txManager.canCommit(tx, changeIds);
+  public void canCommitOrThrow(Transaction tx, Collection<byte[]> changeIds)
+    throws TransactionFailureException {
+    if (!changeIds.isEmpty()) {
+      txManager.canCommit(tx.getTransactionId(), changeIds);
+    }
   }
 
   @Override
   public boolean commit(Transaction tx) throws TransactionNotInProgressException {
-    return txManager.commit(tx);
+    try {
+      commitOrThrow(tx);
+      return true;
+    } catch (TransactionFailureException e) {
+      return false;
+    }
+  }
+
+  @Override
+  public void commitOrThrow(Transaction tx) throws TransactionFailureException {
+    txManager.commit(tx.getTransactionId(), tx.getWritePointer());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/main/java/org/apache/tephra/inmemory/MinimalTxSystemClient.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/inmemory/MinimalTxSystemClient.java b/tephra-core/src/main/java/org/apache/tephra/inmemory/MinimalTxSystemClient.java
index de46f27..558c1ea 100644
--- a/tephra-core/src/main/java/org/apache/tephra/inmemory/MinimalTxSystemClient.java
+++ b/tephra-core/src/main/java/org/apache/tephra/inmemory/MinimalTxSystemClient.java
@@ -61,8 +61,8 @@ public class MinimalTxSystemClient implements TransactionSystemClient {
   }
 
   @Override
-  public boolean canCommitOrThrow(Transaction tx, Collection<byte[]> changeIds) {
-    return true;
+  public void canCommitOrThrow(Transaction tx, Collection<byte[]> changeIds) {
+    // do nothing
   }
 
   @Override
@@ -71,6 +71,11 @@ public class MinimalTxSystemClient implements TransactionSystemClient {
   }
 
   @Override
+  public void commitOrThrow(Transaction tx) {
+    // do nothing
+  }
+
+  @Override
   public void abort(Transaction tx) {
     // do nothing
   }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/main/java/org/apache/tephra/persist/TransactionSnapshot.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/TransactionSnapshot.java b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionSnapshot.java
index d76a98f..c31d156 100644
--- a/tephra-core/src/main/java/org/apache/tephra/persist/TransactionSnapshot.java
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionSnapshot.java
@@ -197,8 +197,8 @@ public class TransactionSnapshot implements TransactionVisibilityState {
   public static TransactionSnapshot copyFrom(long snapshotTime, long readPointer,
                                              long writePointer, InvalidTxList invalidTxList,
                                              NavigableMap<Long, TransactionManager.InProgressTx> inProgress,
-                                             Map<Long, Set<ChangeId>> committing,
-                                             NavigableMap<Long, Set<ChangeId>> committed) {
+                                             Map<Long, TransactionManager.ChangeSet> committing,
+                                             NavigableMap<Long, TransactionManager.ChangeSet> committed) {
     // copy invalid IDs, after sorting
     Collection<Long> invalidCopy = new LongArrayList(invalidTxList.toSortedArray());
     // copy in-progress IDs and expirations
@@ -206,13 +206,13 @@ public class TransactionSnapshot implements TransactionVisibilityState {
 
     // for committing and committed maps, we need to copy each individual Set as well to prevent modification
     Map<Long, Set<ChangeId>> committingCopy = Maps.newHashMap();
-    for (Map.Entry<Long, Set<ChangeId>> entry : committing.entrySet()) {
-      committingCopy.put(entry.getKey(), new HashSet<>(entry.getValue()));
+    for (Map.Entry<Long, TransactionManager.ChangeSet> entry : committing.entrySet()) {
+      committingCopy.put(entry.getKey(), new HashSet<>(entry.getValue().getChangeIds()));
     }
 
     NavigableMap<Long, Set<ChangeId>> committedCopy = new TreeMap<>();
-    for (Map.Entry<Long, Set<ChangeId>> entry : committed.entrySet()) {
-      committedCopy.put(entry.getKey(), new HashSet<>(entry.getValue()));
+    for (Map.Entry<Long, TransactionManager.ChangeSet> entry : committed.entrySet()) {
+      committedCopy.put(entry.getKey(), new HashSet<>(entry.getValue().getChangeIds()));
     }
 
     return new TransactionSnapshot(snapshotTime, readPointer, writePointer,

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/main/thrift/transaction.thrift
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/thrift/transaction.thrift b/tephra-core/src/main/thrift/transaction.thrift
index 729e035..6cbfdad 100644
--- a/tephra-core/src/main/thrift/transaction.thrift
+++ b/tephra-core/src/main/thrift/transaction.thrift
@@ -43,6 +43,12 @@ struct TTransaction {
   9: TVisibilityLevel visibilityLevel
 }
 
+exception TTransactionConflictException {
+  1: i64 transactionId,
+  2: string conflictingKey,
+  3: string conflictingClient
+}
+
 exception TTransactionNotInProgressException {
   1: string message
 }
@@ -73,15 +79,21 @@ service TTransactionServer {
   // TODO remove this as it was replaced with startShortWithTimeout in 0.10
   TTransaction startShortTimeout(1: i32 timeout),
   TTransaction startShortClientId(1: string clientId) throws (1: TGenericException e),
-  TTransaction startShortWithClientIdAndTimeOut(1: string clientId, 2: i32 timeout) throws (1:TGenericException e),
-  TTransaction startShortWithTimeout(1: i32 timeout) throws (1:TGenericException e),
-  TBoolean canCommitTx(1: TTransaction tx, 2: set<binary> changes) throws (1:TTransactionNotInProgressException e),
-  TBoolean canCommitOrThrow(1: TTransaction tx, 2: set<binary> changes) throws (1:TTransactionNotInProgressException e,
-                                                                                2:TGenericException g,),
+  TTransaction startShortWithClientIdAndTimeOut(1: string clientId, 2: i32 timeout) throws (1: TGenericException e),
+  TTransaction startShortWithTimeout(1: i32 timeout) throws (1: TGenericException e),
+  // TODO remove this as it was replaced with canCommitOrThrow in 0.13
+  TBoolean canCommitTx(1: TTransaction tx, 2: set<binary> changes) throws (1: TTransactionNotInProgressException e),
+  void canCommitOrThrow(1: i64 tx, 2: set<binary> changes) throws (1: TTransactionNotInProgressException e,
+                                                                   2: TTransactionConflictException c,
+                                                                   3: TGenericException g),
+  // TODO remove this as it was replaced with commitWithExn in 0.13
   TBoolean commitTx(1: TTransaction tx) throws (1:TTransactionNotInProgressException e),
+  void commitOrThrow(1: i64 txId, 2: i64 wp) throws (1: TTransactionNotInProgressException e,
+                                                     2: TTransactionConflictException c,
+                                                     3: TGenericException g),
   void abortTx(1: TTransaction tx),
-  bool invalidateTx(1: i64 tx),
-  binary getSnapshot() throws (1:TTransactionCouldNotTakeSnapshotException e),
+  bool invalidateTx(1: i64 txid),
+  binary getSnapshot() throws (1: TTransactionCouldNotTakeSnapshotException e),
   void resetState(),
   string status(),
   TBoolean truncateInvalidTx(1: set<i64> txns),

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/test/java/org/apache/tephra/ClientIdTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/ClientIdTest.java b/tephra-core/src/test/java/org/apache/tephra/ClientIdTest.java
new file mode 100644
index 0000000..1b34255
--- /dev/null
+++ b/tephra-core/src/test/java/org/apache/tephra/ClientIdTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tephra.metrics.TxMetricsCollector;
+import org.apache.tephra.persist.InMemoryTransactionStateStorage;
+import org.apache.tephra.persist.TransactionStateStorage;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test the retention of client ids in Tx Manager
+ */
+@SuppressWarnings("WeakerAccess")
+public class ClientIdTest {
+
+  @Test
+  public void testClientIdRetention() throws TransactionFailureException {
+    testClientIdRetention(TransactionManager.ClientIdRetention.OFF, false, false);
+    testClientIdRetention(TransactionManager.ClientIdRetention.ACTIVE, true, false);
+    testClientIdRetention(TransactionManager.ClientIdRetention.COMMITTED, true, true);
+  }
+
+  private void testClientIdRetention(TransactionManager.ClientIdRetention retention,
+                                     boolean expectClientIdInProgress,
+                                     boolean expectClientIdCommitted) throws TransactionFailureException {
+    Configuration conf = new Configuration();
+    conf.set(TxConstants.Manager.CFG_TX_RETAIN_CLIENT_ID, retention.toString());
+    TransactionStateStorage txStateStorage = new InMemoryTransactionStateStorage();
+    TransactionManager txManager = new TransactionManager(conf, txStateStorage, new TxMetricsCollector());
+    txManager.startAndWait();
+    try {
+      testConflict(txManager, expectClientIdInProgress, expectClientIdCommitted);
+    } finally {
+      txManager.stopAndWait();
+    }
+  }
+
+  public void testConflict(TransactionManager txManager,
+                           boolean expectClientIdInProgress,
+                           boolean expectClientIdCommitted) throws TransactionFailureException {
+    testConflict(txManager, expectClientIdInProgress, expectClientIdCommitted, true);
+    testConflict(txManager, expectClientIdInProgress, expectClientIdCommitted, false);
+  }
+
+  /**
+   * Tests two conflicting transactions.
+   * The resulting exception must carry the conflicting change key and client id.
+   *
+   * @param expectClientIdInProgress whether to expect client id in in-progress transactions
+   * @param expectClientIdCommitted whether  to expect client id in committed chaneg sets
+   * @param testCanCommit whether the conflict should be induced by canCommit() or by commit()
+   */
+  public void testConflict(TransactionManager txManager,
+                           boolean expectClientIdInProgress,
+                           boolean expectClientIdCommitted,
+                           boolean testCanCommit) throws TransactionFailureException {
+    // start two transactions, validate client id
+    Transaction tx1 = txManager.startShort("clientA");
+    Transaction tx2 = txManager.startShort("clientB");
+    TransactionManager.InProgressTx inProgressTx1 = txManager.getInProgress(tx1.getTransactionId());
+    Assert.assertNotNull(inProgressTx1);
+    if (expectClientIdInProgress) {
+      Assert.assertEquals("clientA", inProgressTx1.getClientId());
+    } else {
+      Assert.assertNull(inProgressTx1.getClientId());
+    }
+
+    // now commit the two transactions with overlapping change sets to create a conflict
+    final byte[] change1 = new byte[] { '1' };
+    final byte[] change2 = new byte[] { '2' };
+    final byte[] change3 = new byte[] { '3' };
+    if (!testCanCommit) {
+      txManager.canCommit(tx2.getTransactionId(), ImmutableList.of(change2, change3));
+    }
+    txManager.canCommit(tx1.getTransactionId(), ImmutableList.of(change1, change2));
+    txManager.commit(tx1.getTransactionId(), tx1.getWritePointer());
+    try {
+      if (testCanCommit) {
+        txManager.canCommit(tx2.getTransactionId(), ImmutableList.of(change2, change3));
+      } else {
+        txManager.commit(tx2.getTransactionId(), tx2.getWritePointer());
+      }
+      Assert.fail("canCommit() should have failed with conflict");
+    } catch (TransactionConflictException e) {
+      Assert.assertNotNull(e.getTransactionId());
+      Assert.assertEquals(tx2.getTransactionId(), e.getTransactionId().longValue());
+      Assert.assertEquals("2", e.getConflictingKey());
+      if (expectClientIdCommitted) {
+        Assert.assertEquals("clientA", e.getConflictingClient());
+      } else {
+        Assert.assertNull(e.getConflictingClient());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/test/java/org/apache/tephra/DummyTxAware.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/DummyTxAware.java b/tephra-core/src/test/java/org/apache/tephra/DummyTxAware.java
new file mode 100644
index 0000000..54e8a8c
--- /dev/null
+++ b/tephra-core/src/test/java/org/apache/tephra/DummyTxAware.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+import java.util.Collection;
+import java.util.List;
+
+class DummyTxAware implements TransactionAware {
+
+  enum InduceFailure { NoFailure, ReturnFalse, ThrowException }
+
+  boolean started = false;
+  boolean committed = false;
+  boolean checked = false;
+  boolean rolledBack = false;
+  boolean postCommitted = false;
+  private List<byte[]> changes = Lists.newArrayList();
+
+  InduceFailure failStartTxOnce = InduceFailure.NoFailure;
+  InduceFailure failChangesTxOnce = InduceFailure.NoFailure;
+  InduceFailure failCommitTxOnce = InduceFailure.NoFailure;
+  InduceFailure failPostCommitTxOnce = InduceFailure.NoFailure;
+  InduceFailure failRollbackTxOnce = InduceFailure.NoFailure;
+
+  void addChange(byte[] key) {
+    changes.add(key);
+  }
+
+  void reset() {
+    started = false;
+    checked = false;
+    committed = false;
+    rolledBack = false;
+    postCommitted = false;
+    changes.clear();
+  }
+
+  @Override
+  public void startTx(Transaction tx) {
+    reset();
+    started = true;
+    if (failStartTxOnce == InduceFailure.ThrowException) {
+      failStartTxOnce = InduceFailure.NoFailure;
+      throw new RuntimeException("start failure");
+    }
+  }
+
+  @Override
+  public void updateTx(Transaction tx) {
+    // do nothing
+  }
+
+  @Override
+  public Collection<byte[]> getTxChanges() {
+    checked = true;
+    if (failChangesTxOnce == InduceFailure.ThrowException) {
+      failChangesTxOnce = InduceFailure.NoFailure;
+      throw new RuntimeException("changes failure");
+    }
+    return ImmutableList.copyOf(changes);
+  }
+
+  @Override
+  public boolean commitTx() throws Exception {
+    committed = true;
+    if (failCommitTxOnce == InduceFailure.ThrowException) {
+      failCommitTxOnce = InduceFailure.NoFailure;
+      throw new RuntimeException("persist failure");
+    }
+    if (failCommitTxOnce == InduceFailure.ReturnFalse) {
+      failCommitTxOnce = InduceFailure.NoFailure;
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public void postTxCommit() {
+    postCommitted = true;
+    if (failPostCommitTxOnce == InduceFailure.ThrowException) {
+      failPostCommitTxOnce = InduceFailure.NoFailure;
+      throw new RuntimeException("post failure");
+    }
+  }
+
+  @Override
+  public boolean rollbackTx() throws Exception {
+    rolledBack = true;
+    if (failRollbackTxOnce == InduceFailure.ThrowException) {
+      failRollbackTxOnce = InduceFailure.NoFailure;
+      throw new RuntimeException("rollback failure");
+    }
+    if (failRollbackTxOnce == InduceFailure.ReturnFalse) {
+      failRollbackTxOnce = InduceFailure.NoFailure;
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public String getTransactionAwareName() {
+    return "dummy";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/test/java/org/apache/tephra/DummyTxClient.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/DummyTxClient.java b/tephra-core/src/test/java/org/apache/tephra/DummyTxClient.java
new file mode 100644
index 0000000..0321b17
--- /dev/null
+++ b/tephra-core/src/test/java/org/apache/tephra/DummyTxClient.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra;
+
+import com.google.inject.Inject;
+import org.apache.tephra.inmemory.InMemoryTxSystemClient;
+
+import java.util.Collection;
+
+class DummyTxClient extends InMemoryTxSystemClient {
+
+  boolean failCanCommitOnce = false;
+  int failCommits = 0;
+  enum CommitState {
+    Started, Committed, Aborted, Invalidated
+  }
+  CommitState state = CommitState.Started;
+
+  @Inject
+  DummyTxClient(TransactionManager txmgr) {
+    super(txmgr);
+  }
+
+  @Override
+  public void canCommitOrThrow(Transaction tx, Collection<byte[]> changeIds)
+    throws TransactionFailureException {
+    if (failCanCommitOnce) {
+      failCanCommitOnce = false;
+      throw new TransactionConflictException(tx.getTransactionId(), "<unknown>", null);
+    } else {
+      super.canCommitOrThrow(tx, changeIds);
+    }
+  }
+
+  @Override
+  public void commitOrThrow(Transaction tx)
+    throws TransactionFailureException {
+    if (failCommits-- > 0) {
+      throw new TransactionConflictException(tx.getTransactionId(), "<unknown>", null);
+    } else {
+      state = CommitState.Committed;
+      super.commitOrThrow(tx);
+    }
+  }
+
+  @Override
+  public Transaction startLong() {
+    state = CommitState.Started;
+    return super.startLong();
+  }
+
+  @Override
+  public Transaction startShort() {
+    state = CommitState.Started;
+    return super.startShort();
+  }
+
+  @Override
+  public Transaction startShort(int timeout) {
+    state = CommitState.Started;
+    return super.startShort(timeout);
+  }
+
+  @Override
+  public void abort(Transaction tx) {
+    state = CommitState.Aborted;
+    super.abort(tx);
+  }
+
+  @Override
+  public boolean invalidate(long tx) {
+    state = CommitState.Invalidated;
+    return super.invalidate(tx);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java b/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java
index 5f4675b..fcf793e 100644
--- a/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java
@@ -18,16 +18,12 @@
 
 package org.apache.tephra;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
 import com.google.inject.AbstractModule;
 import com.google.inject.Guice;
-import com.google.inject.Inject;
 import com.google.inject.Injector;
 import com.google.inject.Singleton;
 import com.google.inject.util.Modules;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.tephra.inmemory.InMemoryTxSystemClient;
 import org.apache.tephra.runtime.ConfigModule;
 import org.apache.tephra.runtime.DiscoveryModules;
 import org.apache.tephra.runtime.TransactionModules;
@@ -40,8 +36,6 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
 import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
 
 /**
  * Tests the transaction executor.
@@ -74,10 +68,10 @@ public class TransactionContextTest {
     txClient = (DummyTxClient) injector.getInstance(TransactionSystemClient.class);
   }
 
-  final DummyTxAware ds1 = new DummyTxAware(), ds2 = new DummyTxAware();
+  private final DummyTxAware ds1 = new DummyTxAware(), ds2 = new DummyTxAware();
 
-  static final byte[] A = { 'a' };
-  static final byte[] B = { 'b' };
+  private static final byte[] A = { 'a' };
+  private static final byte[] B = { 'b' };
 
   private static TransactionContext newTransactionContext(TransactionAware... txAwares) {
     return new TransactionContext(txClient, txAwares);
@@ -115,7 +109,7 @@ public class TransactionContextTest {
 
   @Test
   public void testPostCommitFailure() throws TransactionFailureException, InterruptedException {
-    ds1.failPostCommitTxOnce = InduceFailure.ThrowException;
+    ds1.failPostCommitTxOnce = DummyTxAware.InduceFailure.ThrowException;
     TransactionContext context = newTransactionContext(ds1, ds2);
     // start transaction
     context.start();
@@ -145,7 +139,7 @@ public class TransactionContextTest {
 
   @Test
   public void testPersistFailure() throws TransactionFailureException, InterruptedException {
-    ds1.failCommitTxOnce = InduceFailure.ThrowException;
+    ds1.failCommitTxOnce = DummyTxAware.InduceFailure.ThrowException;
     TransactionContext context = newTransactionContext(ds1, ds2);
     // start transaction
     context.start();
@@ -175,7 +169,7 @@ public class TransactionContextTest {
 
   @Test
   public void testPersistFalse() throws TransactionFailureException, InterruptedException {
-    ds1.failCommitTxOnce = InduceFailure.ReturnFalse;
+    ds1.failCommitTxOnce = DummyTxAware.InduceFailure.ReturnFalse;
     TransactionContext context = newTransactionContext(ds1, ds2);
     // start transaction
     context.start();
@@ -205,8 +199,8 @@ public class TransactionContextTest {
 
   @Test
   public void testPersistAndRollbackFailure() throws TransactionFailureException, InterruptedException {
-    ds1.failCommitTxOnce = InduceFailure.ThrowException;
-    ds1.failRollbackTxOnce = InduceFailure.ThrowException;
+    ds1.failCommitTxOnce = DummyTxAware.InduceFailure.ThrowException;
+    ds1.failRollbackTxOnce = DummyTxAware.InduceFailure.ThrowException;
     TransactionContext context = newTransactionContext(ds1, ds2);
     // start transaction
     context.start();
@@ -236,8 +230,8 @@ public class TransactionContextTest {
 
   @Test
   public void testPersistAndRollbackFalse() throws TransactionFailureException, InterruptedException {
-    ds1.failCommitTxOnce = InduceFailure.ReturnFalse;
-    ds1.failRollbackTxOnce = InduceFailure.ReturnFalse;
+    ds1.failCommitTxOnce = DummyTxAware.InduceFailure.ReturnFalse;
+    ds1.failRollbackTxOnce = DummyTxAware.InduceFailure.ReturnFalse;
     TransactionContext context = newTransactionContext(ds1, ds2);
     // start transaction
     context.start();
@@ -327,8 +321,8 @@ public class TransactionContextTest {
 
   @Test
   public void testChangesAndRollbackFailure() throws TransactionFailureException, InterruptedException {
-    ds1.failChangesTxOnce = InduceFailure.ThrowException;
-    ds1.failRollbackTxOnce = InduceFailure.ThrowException;
+    ds1.failChangesTxOnce = DummyTxAware.InduceFailure.ThrowException;
+    ds1.failRollbackTxOnce = DummyTxAware.InduceFailure.ThrowException;
     TransactionContext context = newTransactionContext(ds1, ds2);
     // start transaction
     context.start();
@@ -358,7 +352,7 @@ public class TransactionContextTest {
 
   @Test
   public void testStartAndRollbackFailure() throws TransactionFailureException, InterruptedException {
-    ds1.failStartTxOnce = InduceFailure.ThrowException;
+    ds1.failStartTxOnce = DummyTxAware.InduceFailure.ThrowException;
     TransactionContext context = newTransactionContext(ds1, ds2);
     // start transaction
     try {
@@ -410,7 +404,7 @@ public class TransactionContextTest {
 
   @Test
   public void testAddThenFailure() throws TransactionFailureException, InterruptedException {
-    ds2.failCommitTxOnce = InduceFailure.ThrowException;
+    ds2.failCommitTxOnce = DummyTxAware.InduceFailure.ThrowException;
 
     TransactionContext context = newTransactionContext(ds1);
     // start transaction
@@ -482,7 +476,7 @@ public class TransactionContextTest {
 
   @Test
   public void testAndThenRemoveOnFailure() throws TransactionFailureException {
-    ds1.failCommitTxOnce = InduceFailure.ThrowException;
+    ds1.failCommitTxOnce = DummyTxAware.InduceFailure.ThrowException;
     TransactionContext context = newTransactionContext();
 
     context.start();
@@ -507,175 +501,4 @@ public class TransactionContextTest {
 
     Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted);
   }
-
-  enum InduceFailure { NoFailure, ReturnFalse, ThrowException }
-
-  static class DummyTxAware implements TransactionAware {
-
-    Transaction tx;
-    boolean started = false;
-    boolean committed = false;
-    boolean checked = false;
-    boolean rolledBack = false;
-    boolean postCommitted = false;
-    List<byte[]> changes = Lists.newArrayList();
-
-    InduceFailure failStartTxOnce = InduceFailure.NoFailure;
-    InduceFailure failChangesTxOnce = InduceFailure.NoFailure;
-    InduceFailure failCommitTxOnce = InduceFailure.NoFailure;
-    InduceFailure failPostCommitTxOnce = InduceFailure.NoFailure;
-    InduceFailure failRollbackTxOnce = InduceFailure.NoFailure;
-
-    void addChange(byte[] key) {
-      changes.add(key);
-    }
-
-    void reset() {
-      tx = null;
-      started = false;
-      checked = false;
-      committed = false;
-      rolledBack = false;
-      postCommitted = false;
-      changes.clear();
-    }
-
-    @Override
-    public void startTx(Transaction tx) {
-      reset();
-      started = true;
-      this.tx = tx;
-      if (failStartTxOnce == InduceFailure.ThrowException) {
-        failStartTxOnce = InduceFailure.NoFailure;
-        throw new RuntimeException("start failure");
-      }
-    }
-
-    @Override
-    public void updateTx(Transaction tx) {
-      this.tx = tx;
-    }
-
-    @Override
-    public Collection<byte[]> getTxChanges() {
-      checked = true;
-      if (failChangesTxOnce == InduceFailure.ThrowException) {
-        failChangesTxOnce = InduceFailure.NoFailure;
-        throw new RuntimeException("changes failure");
-      }
-      return ImmutableList.copyOf(changes);
-    }
-
-    @Override
-    public boolean commitTx() throws Exception {
-      committed = true;
-      if (failCommitTxOnce == InduceFailure.ThrowException) {
-        failCommitTxOnce = InduceFailure.NoFailure;
-        throw new RuntimeException("persist failure");
-      }
-      if (failCommitTxOnce == InduceFailure.ReturnFalse) {
-        failCommitTxOnce = InduceFailure.NoFailure;
-        return false;
-      }
-      return true;
-    }
-
-    @Override
-    public void postTxCommit() {
-      postCommitted = true;
-      if (failPostCommitTxOnce == InduceFailure.ThrowException) {
-        failPostCommitTxOnce = InduceFailure.NoFailure;
-        throw new RuntimeException("post failure");
-      }
-    }
-
-    @Override
-    public boolean rollbackTx() throws Exception {
-      rolledBack = true;
-      if (failRollbackTxOnce == InduceFailure.ThrowException) {
-        failRollbackTxOnce = InduceFailure.NoFailure;
-        throw new RuntimeException("rollback failure");
-      }
-      if (failRollbackTxOnce == InduceFailure.ReturnFalse) {
-        failRollbackTxOnce = InduceFailure.NoFailure;
-        return false;
-      }
-      return true;
-    }
-
-    @Override
-    public String getTransactionAwareName() {
-      return "dummy";
-    }
-  }
-
-  static class DummyTxClient extends InMemoryTxSystemClient {
-
-    boolean failCanCommitOnce = false;
-    int failCommits = 0;
-    enum CommitState {
-      Started, Committed, Aborted, Invalidated
-    }
-    CommitState state = CommitState.Started;
-
-    @Inject
-    DummyTxClient(TransactionManager txmgr) {
-      super(txmgr);
-    }
-
-    @Override
-    public boolean canCommit(Transaction tx, Collection<byte[]> changeIds) throws TransactionNotInProgressException {
-      if (failCanCommitOnce) {
-        failCanCommitOnce = false;
-        return false;
-      } else {
-        return super.canCommit(tx, changeIds);
-      }
-    }
-
-    @Override
-    public boolean canCommitOrThrow(Transaction tx, Collection<byte[]> changeIds) throws TransactionFailureException {
-      return canCommit(tx, changeIds);
-    }
-
-    @Override
-    public boolean commit(Transaction tx) throws TransactionNotInProgressException {
-      if (failCommits-- > 0) {
-        return false;
-      } else {
-        state = CommitState.Committed;
-        return super.commit(tx);
-      }
-    }
-
-    @Override
-    public Transaction startLong() {
-      state = CommitState.Started;
-      return super.startLong();
-    }
-
-    @Override
-    public Transaction startShort() {
-      state = CommitState.Started;
-      return super.startShort();
-    }
-
-    @Override
-    public Transaction startShort(int timeout) {
-      state = CommitState.Started;
-      return super.startShort(timeout);
-    }
-
-    @Override
-    public void abort(Transaction tx) {
-      state = CommitState.Aborted;
-      super.abort(tx);
-    }
-
-    @Override
-    public boolean invalidate(long tx) {
-      state = CommitState.Invalidated;
-      return super.invalidate(tx);
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/test/java/org/apache/tephra/TransactionExecutorTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/TransactionExecutorTest.java b/tephra-core/src/test/java/org/apache/tephra/TransactionExecutorTest.java
index 676774c..506ffd9 100644
--- a/tephra-core/src/test/java/org/apache/tephra/TransactionExecutorTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/TransactionExecutorTest.java
@@ -19,19 +19,16 @@
 package org.apache.tephra;
 
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
 import com.google.inject.AbstractModule;
 import com.google.inject.Guice;
-import com.google.inject.Inject;
 import com.google.inject.Injector;
 import com.google.inject.Singleton;
 import com.google.inject.util.Modules;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.tephra.inmemory.InMemoryTxSystemClient;
 import org.apache.tephra.runtime.ConfigModule;
 import org.apache.tephra.runtime.DiscoveryModules;
 import org.apache.tephra.runtime.TransactionModules;
-import org.apache.tephra.snapshot.DefaultSnapshotCodec;
+import org.apache.tephra.snapshot.SnapshotCodecV4;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -41,7 +38,6 @@ import org.junit.rules.TemporaryFolder;
 
 import java.io.IOException;
 import java.util.Collection;
-import java.util.List;
 import javax.annotation.Nullable;
 
 /**
@@ -57,7 +53,7 @@ public class TransactionExecutorTest {
   @BeforeClass
   public static void setup() throws IOException {
     final Configuration conf = new Configuration();
-    conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, DefaultSnapshotCodec.class.getName());
+    conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, SnapshotCodecV4.class.getName());
     conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath());
     Injector injector = Guice.createInjector(
       new ConfigModule(conf),
@@ -77,8 +73,8 @@ public class TransactionExecutorTest {
     factory = injector.getInstance(TransactionExecutorFactory.class);
   }
 
-  final DummyTxAware ds1 = new DummyTxAware(), ds2 = new DummyTxAware();
-  final Collection<TransactionAware> txAwares = ImmutableList.<TransactionAware>of(ds1, ds2);
+  private final DummyTxAware ds1 = new DummyTxAware(), ds2 = new DummyTxAware();
+  private final Collection<TransactionAware> txAwares = ImmutableList.<TransactionAware>of(ds1, ds2);
 
   private TransactionExecutor getExecutor() {
     return factory.createExecutor(txAwares);
@@ -88,10 +84,10 @@ public class TransactionExecutorTest {
     return new DefaultTransactionExecutor(txClient, txAwares, RetryStrategies.noRetries());
   }
 
-  static final byte[] A = { 'a' };
-  static final byte[] B = { 'b' };
+  private static final byte[] A = { 'a' };
+  private static final byte[] B = { 'b' };
 
-  final TransactionExecutor.Function<Integer, Integer> testFunction =
+  private final TransactionExecutor.Function<Integer, Integer> testFunction =
     new TransactionExecutor.Function<Integer, Integer>() {
       @Override
       public Integer apply(@Nullable Integer input) {
@@ -131,7 +127,7 @@ public class TransactionExecutorTest {
 
   @Test
   public void testPostCommitFailure() throws TransactionFailureException, InterruptedException {
-    ds1.failPostCommitTxOnce = InduceFailure.ThrowException;
+    ds1.failPostCommitTxOnce = DummyTxAware.InduceFailure.ThrowException;
     // execute: add a change to ds1 and ds2
     try {
       getExecutor().execute(testFunction, 10);
@@ -155,7 +151,7 @@ public class TransactionExecutorTest {
 
   @Test
   public void testPersistFailure() throws TransactionFailureException, InterruptedException {
-    ds1.failCommitTxOnce = InduceFailure.ThrowException;
+    ds1.failCommitTxOnce = DummyTxAware.InduceFailure.ThrowException;
     // execute: add a change to ds1 and ds2
     try {
       getExecutor().execute(testFunction, 10);
@@ -179,7 +175,7 @@ public class TransactionExecutorTest {
 
   @Test
   public void testPersistFalse() throws TransactionFailureException, InterruptedException {
-    ds1.failCommitTxOnce = InduceFailure.ReturnFalse;
+    ds1.failCommitTxOnce = DummyTxAware.InduceFailure.ReturnFalse;
     // execute: add a change to ds1 and ds2
     try {
       getExecutor().execute(testFunction, 10);
@@ -203,8 +199,8 @@ public class TransactionExecutorTest {
 
   @Test
   public void testPersistAndRollbackFailure() throws TransactionFailureException, InterruptedException {
-    ds1.failCommitTxOnce = InduceFailure.ThrowException;
-    ds1.failRollbackTxOnce = InduceFailure.ThrowException;
+    ds1.failCommitTxOnce = DummyTxAware.InduceFailure.ThrowException;
+    ds1.failRollbackTxOnce = DummyTxAware.InduceFailure.ThrowException;
     // execute: add a change to ds1 and ds2
     try {
       getExecutor().execute(testFunction, 10);
@@ -228,8 +224,8 @@ public class TransactionExecutorTest {
 
   @Test
   public void testPersistAndRollbackFalse() throws TransactionFailureException, InterruptedException {
-    ds1.failCommitTxOnce = InduceFailure.ReturnFalse;
-    ds1.failRollbackTxOnce = InduceFailure.ReturnFalse;
+    ds1.failCommitTxOnce = DummyTxAware.InduceFailure.ReturnFalse;
+    ds1.failRollbackTxOnce = DummyTxAware.InduceFailure.ReturnFalse;
     // execute: add a change to ds1 and ds2
     try {
       getExecutor().execute(testFunction, 10);
@@ -351,8 +347,8 @@ public class TransactionExecutorTest {
 
   @Test
   public void testChangesAndRollbackFailure() throws TransactionFailureException, InterruptedException {
-    ds1.failChangesTxOnce = InduceFailure.ThrowException;
-    ds1.failRollbackTxOnce = InduceFailure.ThrowException;
+    ds1.failChangesTxOnce = DummyTxAware.InduceFailure.ThrowException;
+    ds1.failRollbackTxOnce = DummyTxAware.InduceFailure.ThrowException;
     // execute: add a change to ds1 and ds2
     try {
       getExecutor().execute(testFunction, 10);
@@ -376,7 +372,7 @@ public class TransactionExecutorTest {
 
   @Test
   public void testFunctionAndRollbackFailure() throws TransactionFailureException, InterruptedException {
-    ds1.failRollbackTxOnce = InduceFailure.ReturnFalse;
+    ds1.failRollbackTxOnce = DummyTxAware.InduceFailure.ReturnFalse;
     // execute: add a change to ds1 and ds2
     try {
       getExecutor().execute(testFunction, null);
@@ -400,7 +396,7 @@ public class TransactionExecutorTest {
 
   @Test
   public void testStartAndRollbackFailure() throws TransactionFailureException, InterruptedException {
-    ds1.failStartTxOnce = InduceFailure.ThrowException;
+    ds1.failStartTxOnce = DummyTxAware.InduceFailure.ThrowException;
     // execute: add a change to ds1 and ds2
     try {
       getExecutor().execute(testFunction, 10);
@@ -421,175 +417,4 @@ public class TransactionExecutorTest {
     Assert.assertFalse(ds2.rolledBack);
     Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted);
   }
-
-  enum InduceFailure { NoFailure, ReturnFalse, ThrowException }
-
-  static class DummyTxAware implements TransactionAware {
-
-    Transaction tx;
-    boolean started = false;
-    boolean committed = false;
-    boolean checked = false;
-    boolean rolledBack = false;
-    boolean postCommitted = false;
-    List<byte[]> changes = Lists.newArrayList();
-
-    InduceFailure failStartTxOnce = InduceFailure.NoFailure;
-    InduceFailure failChangesTxOnce = InduceFailure.NoFailure;
-    InduceFailure failCommitTxOnce = InduceFailure.NoFailure;
-    InduceFailure failPostCommitTxOnce = InduceFailure.NoFailure;
-    InduceFailure failRollbackTxOnce = InduceFailure.NoFailure;
-
-    void addChange(byte[] key) {
-      changes.add(key);
-    }
-
-    void reset() {
-      tx = null;
-      started = false;
-      checked = false;
-      committed = false;
-      rolledBack = false;
-      postCommitted = false;
-      changes.clear();
-    }
-
-    @Override
-    public void startTx(Transaction tx) {
-      reset();
-      started = true;
-      this.tx = tx;
-      if (failStartTxOnce == InduceFailure.ThrowException) {
-        failStartTxOnce = InduceFailure.NoFailure;
-        throw new RuntimeException("start failure");
-      }
-    }
-
-    @Override
-    public void updateTx(Transaction tx) {
-      this.tx = tx;
-    }
-
-    @Override
-    public Collection<byte[]> getTxChanges() {
-      checked = true;
-      if (failChangesTxOnce == InduceFailure.ThrowException) {
-        failChangesTxOnce = InduceFailure.NoFailure;
-        throw new RuntimeException("changes failure");
-      }
-      return ImmutableList.copyOf(changes);
-    }
-
-    @Override
-    public boolean commitTx() throws Exception {
-      committed = true;
-      if (failCommitTxOnce == InduceFailure.ThrowException) {
-        failCommitTxOnce = InduceFailure.NoFailure;
-        throw new RuntimeException("persist failure");
-      }
-      if (failCommitTxOnce == InduceFailure.ReturnFalse) {
-        failCommitTxOnce = InduceFailure.NoFailure;
-        return false;
-      }
-      return true;
-    }
-
-    @Override
-    public void postTxCommit() {
-      postCommitted = true;
-      if (failPostCommitTxOnce == InduceFailure.ThrowException) {
-        failPostCommitTxOnce = InduceFailure.NoFailure;
-        throw new RuntimeException("post failure");
-      }
-    }
-
-    @Override
-    public boolean rollbackTx() throws Exception {
-      rolledBack = true;
-      if (failRollbackTxOnce == InduceFailure.ThrowException) {
-        failRollbackTxOnce = InduceFailure.NoFailure;
-        throw new RuntimeException("rollback failure");
-      }
-      if (failRollbackTxOnce == InduceFailure.ReturnFalse) {
-        failRollbackTxOnce = InduceFailure.NoFailure;
-        return false;
-      }
-      return true;
-    }
-
-    @Override
-    public String getTransactionAwareName() {
-      return "dummy";
-    }
-  }
-
-  static class DummyTxClient extends InMemoryTxSystemClient {
-
-    boolean failCanCommitOnce = false;
-    int failCommits = 0;
-    enum CommitState {
-      Started, Committed, Aborted, Invalidated
-    }
-    CommitState state = CommitState.Started;
-
-    @Inject
-    DummyTxClient(TransactionManager txmgr) {
-      super(txmgr);
-    }
-
-    @Override
-    public boolean canCommit(Transaction tx, Collection<byte[]> changeIds) throws TransactionNotInProgressException {
-      if (failCanCommitOnce) {
-        failCanCommitOnce = false;
-        return false;
-      } else {
-        return super.canCommit(tx, changeIds);
-      }
-    }
-
-    @Override
-    public boolean canCommitOrThrow(Transaction tx, Collection<byte[]> changeIds) throws TransactionFailureException {
-      return canCommit(tx, changeIds);
-    }
-
-    @Override
-    public boolean commit(Transaction tx) throws TransactionNotInProgressException {
-      if (failCommits-- > 0) {
-        return false;
-      } else {
-        state = CommitState.Committed;
-        return super.commit(tx);
-      }
-    }
-
-    @Override
-    public Transaction startLong() {
-      state = CommitState.Started;
-      return super.startLong();
-    }
-
-    @Override
-    public Transaction startShort() {
-      state = CommitState.Started;
-      return super.startShort();
-    }
-
-    @Override
-    public Transaction startShort(int timeout) {
-      state = CommitState.Started;
-      return super.startShort(timeout);
-    }
-
-    @Override
-    public void abort(Transaction tx) {
-      state = CommitState.Aborted;
-      super.abort(tx);
-    }
-
-    @Override
-    public boolean invalidate(long tx) {
-      state = CommitState.Invalidated;
-      return super.invalidate(tx);
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/test/java/org/apache/tephra/TransactionManagerTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/TransactionManagerTest.java b/tephra-core/src/test/java/org/apache/tephra/TransactionManagerTest.java
index 819a981..b16d93d 100644
--- a/tephra-core/src/test/java/org/apache/tephra/TransactionManagerTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/TransactionManagerTest.java
@@ -31,12 +31,14 @@ import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.concurrent.TimeUnit;
 
 /**
  *
  */
+@SuppressWarnings("WeakerAccess")
 public class TransactionManagerTest extends TransactionSystemTest {
 
   private static Configuration conf;
@@ -76,6 +78,92 @@ public class TransactionManagerTest extends TransactionSystemTest {
   }
 
   @Test
+  public void testCheckpointing() throws TransactionFailureException {
+    // create a few transactions
+    Transaction tx1 = txManager.startShort();
+    Transaction tx2 = txManager.startShort();
+    Transaction tx3 = txManager.startShort();
+
+    // start and commit a few
+    for (int i = 0; i < 5; i++) {
+      Transaction tx = txManager.startShort();
+      txManager.canCommit(tx.getTransactionId(), Collections.singleton(new byte[] { (byte) i }));
+      txManager.commit(tx.getTransactionId(), tx.getWritePointer());
+    }
+
+    // checkpoint the transactions
+    Transaction tx3c = txManager.checkpoint(tx3);
+    Transaction tx2c = txManager.checkpoint(tx2);
+    Transaction tx1c = txManager.checkpoint(tx1);
+
+    // start and commit a few (this moves the read pointer past all checkpoint write versions)
+    for (int i = 5; i < 10; i++) {
+      Transaction tx = txManager.startShort();
+      txManager.canCommit(tx.getTransactionId(), Collections.singleton(new byte[] { (byte) i }));
+      txManager.commit(tx.getTransactionId(), tx.getWritePointer());
+    }
+
+    // start new tx and validate all write pointers are excluded
+    Transaction tx = txManager.startShort();
+    validateSorted(tx.getInProgress());
+    validateSorted(tx.getInvalids());
+    Assert.assertFalse(tx.isVisible(tx1.getWritePointer()));
+    Assert.assertFalse(tx.isVisible(tx2.getWritePointer()));
+    Assert.assertFalse(tx.isVisible(tx3.getWritePointer()));
+    Assert.assertFalse(tx.isVisible(tx1c.getWritePointer()));
+    Assert.assertFalse(tx.isVisible(tx2c.getWritePointer()));
+    Assert.assertFalse(tx.isVisible(tx3c.getWritePointer()));
+    txManager.abort(tx);
+
+    // abort one of the checkpoints
+    txManager.abort(tx1c);
+
+    // start new tx and validate all write pointers are excluded
+    tx = txManager.startShort();
+    validateSorted(tx.getInProgress());
+    validateSorted(tx.getInvalids());
+    Assert.assertFalse(tx.isVisible(tx2.getWritePointer()));
+    Assert.assertFalse(tx.isVisible(tx3.getWritePointer()));
+    Assert.assertFalse(tx.isVisible(tx2c.getWritePointer()));
+    Assert.assertFalse(tx.isVisible(tx3c.getWritePointer()));
+    txManager.abort(tx);
+
+    // invalidate one of the checkpoints
+    txManager.invalidate(tx2c.getTransactionId());
+
+    // start new tx and validate all write pointers are excluded
+    tx = txManager.startShort();
+    validateSorted(tx.getInProgress());
+    validateSorted(tx.getInvalids());
+    Assert.assertFalse(tx.isVisible(tx2.getWritePointer()));
+    Assert.assertFalse(tx.isVisible(tx3.getWritePointer()));
+    Assert.assertFalse(tx.isVisible(tx2c.getWritePointer()));
+    Assert.assertFalse(tx.isVisible(tx3c.getWritePointer()));
+    txManager.abort(tx);
+
+    // commit the last checkpoint
+    txManager.canCommit(tx3.getTransactionId(), Collections.<byte[]>emptyList());
+    txManager.commit(tx3c.getTransactionId(), tx3c.getWritePointer());
+
+    // start new tx and validate all write pointers are excluded
+    tx = txManager.startShort();
+    validateSorted(tx.getInProgress());
+    validateSorted(tx.getInvalids());
+    Assert.assertFalse(tx.isVisible(tx2.getWritePointer()));
+    Assert.assertFalse(tx.isVisible(tx2c.getWritePointer()));
+    txManager.abort(tx);
+  }
+
+  private void validateSorted(long[] array) {
+    Long lastSeen = null;
+    for (long value : array) {
+      Assert.assertTrue(String.format("%s is not sorted", Arrays.toString(array)),
+                        lastSeen == null || lastSeen < value);
+      lastSeen = value;
+    }
+  }
+
+  @Test
   public void testTransactionCleanup() throws Exception {
     Configuration config = new Configuration(conf);
     config.setInt(TxConstants.Manager.CFG_TX_CLEANUP_INTERVAL, 3);
@@ -98,8 +186,8 @@ public class TransactionManagerTest extends TransactionSystemTest {
       // start and commit a bunch of transactions
       for (int i = 0; i < 10; i++) {
         Transaction tx = txm.startShort();
-        Assert.assertTrue(txm.canCommit(tx, Collections.singleton(new byte[] { (byte) i })));
-        Assert.assertTrue(txm.commit(tx));
+        txm.canCommit(tx.getTransactionId(), Collections.singleton(new byte[] { (byte) i }));
+        txm.commit(tx.getTransactionId(), tx.getWritePointer());
       }
       // all of these should still be in the committed set
       Assert.assertEquals(0, txm.getInvalidSize());
@@ -124,14 +212,14 @@ public class TransactionManagerTest extends TransactionSystemTest {
                                  tx2.getWritePointer(),
                                  tx2c.getWritePointer()}, txx.getInvalids());
       // try to commit the last transaction that was started
-      Assert.assertTrue(txm.canCommit(txx, Collections.singleton(new byte[] { 0x0a })));
-      Assert.assertTrue(txm.commit(txx));
+      txm.canCommit(txx.getTransactionId(), Collections.singleton(new byte[] { 0x0a }));
+      txm.commit(txx.getTransactionId(), txx.getWritePointer());
 
       // now the committed change sets should be empty again
       Assert.assertEquals(0, txm.getCommittedSize());
       // cannot commit transaction as it was timed out
       try {
-        txm.canCommit(tx1, Collections.singleton(new byte[] { 0x11 }));
+        txm.canCommit(tx1.getTransactionId(), Collections.singleton(new byte[] { 0x11 }));
         Assert.fail();
       } catch (TransactionNotInProgressException e) {
         // expected
@@ -148,14 +236,14 @@ public class TransactionManagerTest extends TransactionSystemTest {
       // run another bunch of transactions
       for (int i = 0; i < 10; i++) {
         Transaction tx = txm.startShort();
-        Assert.assertTrue(txm.canCommit(tx, Collections.singleton(new byte[] { (byte) i })));
-        Assert.assertTrue(txm.commit(tx));
+        txm.canCommit(tx.getTransactionId(), Collections.singleton(new byte[] { (byte) i }));
+        txm.commit(tx.getTransactionId(), tx.getWritePointer());
       }
       // none of these should still be in the committed set (tx2 is long-running).
       Assert.assertEquals(0, txm.getInvalidSize());
       Assert.assertEquals(0, txm.getCommittedSize());
       // commit tx2, abort tx3
-      Assert.assertTrue(txm.commit(ltx1));
+      txm.commit(ltx1.getTransactionId(), ltx1.getWritePointer());
       txm.abort(ltx2);
       // none of these should still be in the committed set (tx2 is long-running).
       // Only tx3 is invalid list as it was aborted and is long-running. tx1 is short one and it rolled back its changes
@@ -196,7 +284,7 @@ public class TransactionManagerTest extends TransactionSystemTest {
 
       // cannot commit transaction as it was timed out
       try {
-        txm.canCommit(tx1, Collections.singleton(new byte[] { 0x11 }));
+        txm.canCommit(tx1.getTransactionId(), Collections.singleton(new byte[] { 0x11 }));
         Assert.fail();
       } catch (TransactionNotInProgressException e) {
         // expected

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/test/java/org/apache/tephra/TransactionSystemTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/TransactionSystemTest.java b/tephra-core/src/test/java/org/apache/tephra/TransactionSystemTest.java
index 5448052..6cbda2f 100644
--- a/tephra-core/src/test/java/org/apache/tephra/TransactionSystemTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/TransactionSystemTest.java
@@ -90,7 +90,7 @@ public abstract class TransactionSystemTest {
     }
     Transaction tx = client.startShort();
     client.canCommitOrThrow(tx, fiftyChanges);
-    client.commit(tx);
+    client.commitOrThrow(tx);
 
     // now try another transaction with 51 changes
     fiftyChanges.add(new byte[] { 50 });
@@ -112,7 +112,7 @@ public abstract class TransactionSystemTest {
     }
     tx = client.startShort();
     client.canCommitOrThrow(tx, changes2k);
-    client.commit(tx);
+    client.commitOrThrow(tx);
 
     // now add another byte to the change set to exceed the limit
     changes2k.add(new byte[] { 0 });
@@ -134,14 +134,14 @@ public abstract class TransactionSystemTest {
     Transaction tx1 = client1.startShort();
     Transaction tx2 = client2.startShort();
 
-    Assert.assertTrue(client1.canCommitOrThrow(tx1, asList(C1, C2)));
+    client1.canCommitOrThrow(tx1, asList(C1, C2));
     // second one also can commit even thought there are conflicts with first since first one hasn't committed yet
-    Assert.assertTrue(client2.canCommitOrThrow(tx2, asList(C2, C3)));
+    client2.canCommitOrThrow(tx2, asList(C2, C3));
 
-    Assert.assertTrue(client1.commit(tx1));
+    client1.commitOrThrow(tx1);
 
     // now second one should not commit, since there are conflicts with tx1 that has been committed
-    Assert.assertFalse(client2.commit(tx2));
+    assertCommitConflicts(client2, tx2);
   }
 
   @Test
@@ -161,16 +161,16 @@ public abstract class TransactionSystemTest {
     Transaction tx4 = client4.startShort();
     Transaction tx5 = client5.startShort();
 
-    Assert.assertTrue(client1.canCommitOrThrow(tx1, asList(C1)));
-    Assert.assertTrue(client1.commit(tx1));
+    client1.canCommitOrThrow(tx1, asList(C1));
+    client1.commitOrThrow(tx1);
 
-    Assert.assertTrue(client2.canCommitOrThrow(tx2, asList(C2)));
-    Assert.assertTrue(client2.commit(tx2));
+    client2.canCommitOrThrow(tx2, asList(C2));
+    client2.commitOrThrow(tx2);
 
     // verifying conflicts detection
-    Assert.assertFalse(client3.canCommitOrThrow(tx3, asList(C1)));
-    Assert.assertFalse(client4.canCommitOrThrow(tx4, asList(C2)));
-    Assert.assertTrue(client5.canCommitOrThrow(tx5, asList(C3)));
+    assertCanCommitConflicts(client3, tx3, asList(C1));
+    assertCanCommitConflicts(client4, tx4, asList(C2));
+    client5.canCommitOrThrow(tx5, asList(C3));
   }
 
   @Test
@@ -178,15 +178,10 @@ public abstract class TransactionSystemTest {
     TransactionSystemClient client = getClient();
     Transaction tx = client.startShort();
 
-    Assert.assertTrue(client.canCommitOrThrow(tx, asList(C1, C2)));
-    Assert.assertTrue(client.commit(tx));
+    client.canCommitOrThrow(tx, asList(C1, C2));
+    client.commitOrThrow(tx);
     // cannot commit twice same tx
-    try {
-      Assert.assertFalse(client.commit(tx));
-      Assert.fail();
-    } catch (TransactionNotInProgressException e) {
-      // expected
-    }
+    assertCommitNotInProgress(client, tx);
   }
 
   @Test
@@ -194,7 +189,7 @@ public abstract class TransactionSystemTest {
     TransactionSystemClient client = getClient();
     Transaction tx = client.startShort();
 
-    Assert.assertTrue(client.canCommitOrThrow(tx, asList(C1, C2)));
+    client.canCommitOrThrow(tx, asList(C1, C2));
     client.abort(tx);
     // abort of not active tx has no affect
     client.abort(tx);
@@ -205,21 +200,12 @@ public abstract class TransactionSystemTest {
     TransactionSystemClient client = getClient();
     Transaction tx = client.startShort();
 
-    Assert.assertTrue(client.canCommitOrThrow(tx, asList(C1, C2)));
-    Assert.assertTrue(client.commit(tx));
+    client.canCommitOrThrow(tx, asList(C1, C2));
+    client.commitOrThrow(tx);
+
     // can't re-use same tx again
-    try {
-      client.canCommitOrThrow(tx, asList(C3, C4));
-      Assert.fail();
-    } catch (TransactionNotInProgressException e) {
-      // expected
-    }
-    try {
-      Assert.assertFalse(client.commit(tx));
-      Assert.fail();
-    } catch (TransactionNotInProgressException e) {
-      // expected
-    }
+    assertCanCommitNotInProgress(client, tx, asList(C3, C4));
+    assertCommitNotInProgress(client, tx);
 
     // abort of not active tx has no affect
     client.abort(tx);
@@ -229,24 +215,15 @@ public abstract class TransactionSystemTest {
   public void testUseNotStarted() throws Exception {
     TransactionSystemClient client = getClient();
     Transaction tx1 = client.startShort();
-    Assert.assertTrue(client.commit(tx1));
+    client.commitOrThrow(tx1);
 
     // we know this is one is older than current writePointer and was not used
     Transaction txOld = new Transaction(tx1.getReadPointer(), tx1.getTransactionId() - 1,
                                         new long[] {}, new long[] {}, Transaction.NO_TX_IN_PROGRESS, 
                                         TransactionType.SHORT);
-    try {
-      Assert.assertFalse(client.canCommitOrThrow(txOld, asList(C3, C4)));
-      Assert.fail();
-    } catch (TransactionNotInProgressException e) {
-      // expected
-    }
-    try {
-      Assert.assertFalse(client.commit(txOld));
-      Assert.fail();
-    } catch (TransactionNotInProgressException e) {
-      // expected
-    }
+    assertCanCommitNotInProgress(client, txOld, asList(C3, C4));
+    assertCommitNotInProgress(client, txOld);
+
     // abort of not active tx has no affect
     client.abort(txOld);
 
@@ -254,18 +231,9 @@ public abstract class TransactionSystemTest {
     Transaction txNew = new Transaction(tx1.getReadPointer(), tx1.getTransactionId() + 1,
                                         new long[] {}, new long[] {}, Transaction.NO_TX_IN_PROGRESS, 
                                         TransactionType.SHORT);
-    try {
-      Assert.assertFalse(client.canCommitOrThrow(txNew, asList(C3, C4)));
-      Assert.fail();
-    } catch (TransactionNotInProgressException e) {
-      // expected
-    }
-    try {
-      Assert.assertFalse(client.commit(txNew));
-      Assert.fail();
-    } catch (TransactionNotInProgressException e) {
-      // expected
-    }
+    assertCanCommitNotInProgress(client, txNew, asList(C3, C4));
+    assertCommitNotInProgress(client, txNew);
+
     // abort of not active tx has no affect
     client.abort(txNew);
   }
@@ -275,8 +243,9 @@ public abstract class TransactionSystemTest {
     TransactionSystemClient client = getClient();
     Transaction tx = client.startShort();
 
-    Assert.assertTrue(client.canCommitOrThrow(tx, asList(C1, C2)));
-    Assert.assertTrue(client.commit(tx));
+    client.canCommitOrThrow(tx, asList(C1, C2));
+    client.commitOrThrow(tx);
+
     // abort of not active tx has no affect
     client.abort(tx);
   }
@@ -292,7 +261,7 @@ public abstract class TransactionSystemTest {
     // Cannot invalidate a committed tx
     Transaction tx2 = client.startShort();
     client.canCommitOrThrow(tx2, asList(C3, C4));
-    client.commit(tx2);
+    client.commitOrThrow(tx2);
     Assert.assertFalse(client.invalidate(tx2.getTransactionId()));
   }
 
@@ -306,7 +275,7 @@ public abstract class TransactionSystemTest {
     Transaction tx1 = client.startShort();
     Transaction tx2 = client.startShort();
     client.canCommitOrThrow(tx1, asList(C1, C2));
-    client.commit(tx1);
+    client.commitOrThrow(tx1);
     client.canCommitOrThrow(tx2, asList(C3, C4));
 
     Transaction txPreReset = client.startShort();
@@ -409,8 +378,8 @@ public abstract class TransactionSystemTest {
     // start and commit a few
     for (int i = 0; i < 5; i++) {
       Transaction tx = client.startShort();
-      Assert.assertTrue(client.canCommit(tx, Collections.singleton(new byte[] { (byte) i })));
-      Assert.assertTrue(client.commit(tx));
+      client.canCommitOrThrow(tx, Collections.singleton(new byte[] { (byte) i }));
+      client.commitOrThrow(tx);
     }
 
     // checkpoint the transactions
@@ -421,8 +390,8 @@ public abstract class TransactionSystemTest {
     // start and commit a few (this moves the read pointer past all checkpoint write versions)
     for (int i = 5; i < 10; i++) {
       Transaction tx = client.startShort();
-      Assert.assertTrue(client.canCommit(tx, Collections.singleton(new byte[] { (byte) i })));
-      Assert.assertTrue(client.commit(tx));
+      client.canCommitOrThrow(tx, Collections.singleton(new byte[] { (byte) i }));
+      client.commitOrThrow(tx);
     }
 
     // start new tx and validate all write pointers are excluded
@@ -464,8 +433,8 @@ public abstract class TransactionSystemTest {
     client.abort(tx);
 
     // commit the last checkpoint
-    Assert.assertTrue(client.canCommit(tx3, Collections.<byte[]>emptyList()));
-    Assert.assertTrue(client.commit(tx3c));
+    client.canCommitOrThrow(tx3, Collections.<byte[]>emptyList());
+    client.commitOrThrow(tx3c);
 
     // start new tx and validate all write pointers are excluded
     tx = client.startShort();
@@ -485,6 +454,46 @@ public abstract class TransactionSystemTest {
     }
   }
 
+  private void assertCommitConflicts(TransactionSystemClient client, Transaction tx)
+    throws TransactionFailureException {
+    try {
+      client.commitOrThrow(tx);
+      Assert.fail();
+    } catch (TransactionConflictException e) {
+      //expected
+    }
+  }
+
+  private void assertCanCommitConflicts(TransactionSystemClient client, Transaction tx, Collection<byte[]> changes)
+    throws TransactionFailureException {
+    try {
+      client.canCommitOrThrow(tx, changes);
+      Assert.fail();
+    } catch (TransactionConflictException e) {
+      //expected
+    }
+  }
+
+  private void assertCommitNotInProgress(TransactionSystemClient client, Transaction tx)
+    throws TransactionFailureException {
+    try {
+      client.commitOrThrow(tx);
+      Assert.fail();
+    } catch (TransactionNotInProgressException e) {
+      //expected
+    }
+  }
+
+  private void assertCanCommitNotInProgress(TransactionSystemClient client, Transaction tx, Collection<byte[]> changes)
+    throws TransactionFailureException {
+    try {
+      client.canCommitOrThrow(tx, changes);
+      Assert.fail();
+    } catch (TransactionNotInProgressException e) {
+      //expected
+    }
+  }
+
   private Collection<byte[]> asList(byte[]... val) {
     return Arrays.asList(val);
   }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/test/java/org/apache/tephra/distributed/ThriftTransactionServerTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/distributed/ThriftTransactionServerTest.java b/tephra-core/src/test/java/org/apache/tephra/distributed/ThriftTransactionServerTest.java
index 6075452..bd48f7a 100644
--- a/tephra-core/src/test/java/org/apache/tephra/distributed/ThriftTransactionServerTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/distributed/ThriftTransactionServerTest.java
@@ -197,7 +197,7 @@ public class ThriftTransactionServerTest {
     // simply start + commit transaction
     TransactionSystemClient txClient = getClient();
     Transaction tx = txClient.startShort();
-    txClient.commit(tx);
+    txClient.commitOrThrow(tx);
 
     // Expire zookeeper session, which causes Thrift server to stop running.
     expireZkSession(zkClientService);
@@ -215,7 +215,7 @@ public class ThriftTransactionServerTest {
     txClient = getClient();
     // verify that we can start and commit a transaction after becoming leader again
     tx = txClient.startShort();
-    txClient.commit(tx);
+    txClient.commitOrThrow(tx);
   }
 
   private void expireZkSession(ZKClientService zkClientService) throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/test/java/org/apache/tephra/persist/AbstractTransactionStateStorageTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/persist/AbstractTransactionStateStorageTest.java b/tephra-core/src/test/java/org/apache/tephra/persist/AbstractTransactionStateStorageTest.java
index 971c93c..961d368 100644
--- a/tephra-core/src/test/java/org/apache/tephra/persist/AbstractTransactionStateStorageTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/persist/AbstractTransactionStateStorageTest.java
@@ -28,6 +28,7 @@ import it.unimi.dsi.fastutil.longs.LongArrayList;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tephra.ChangeId;
 import org.apache.tephra.Transaction;
+import org.apache.tephra.TransactionConflictException;
 import org.apache.tephra.TransactionManager;
 import org.apache.tephra.TransactionType;
 import org.apache.tephra.TxConstants;
@@ -142,11 +143,11 @@ public abstract class AbstractTransactionStateStorageTest {
       txManager.invalidate(invalid.getTransactionId());
       // start a tx1, add a change A and commit
       Transaction tx1 = txManager.startShort("client1");
-      Assert.assertTrue(txManager.canCommit(tx1, Collections.singleton(a)));
-      Assert.assertTrue(txManager.commit(tx1));
+      txManager.canCommit(tx1.getTransactionId(), Collections.singleton(a));
+      txManager.commit(tx1.getTransactionId(), tx1.getWritePointer());
       // start a tx2 and add a change B
       Transaction tx2 = txManager.startShort("client2");
-      Assert.assertTrue(txManager.canCommit(tx2, Collections.singleton(b)));
+      txManager.canCommit(tx2.getTransactionId(), Collections.singleton(b));
       // start a tx3
       Transaction tx3 = txManager.startShort("client3");
       // restart
@@ -172,7 +173,7 @@ public abstract class AbstractTransactionStateStorageTest {
       txManager.abort(invalid);
 
       // commit tx2
-      Assert.assertTrue(txManager.commit(tx2));
+      txManager.commit(tx2.getTransactionId(), tx2.getWritePointer());
       // start another transaction, must be greater than tx3
       Transaction tx4 = txManager.startShort();
       Assert.assertTrue(tx4.getTransactionId() > tx3.getTransactionId());
@@ -181,7 +182,12 @@ public abstract class AbstractTransactionStateStorageTest {
       Assert.assertFalse(tx2.isVisible(tx3.getTransactionId()));
       Assert.assertFalse(tx2.isVisible(tx4.getTransactionId()));
       // add same change for tx3
-      Assert.assertFalse(txManager.canCommit(tx3, Collections.singleton(b)));
+      try {
+        txManager.canCommit(tx3.getTransactionId(), Collections.singleton(b));
+        Assert.fail("canCommit() should have failed");
+      } catch (TransactionConflictException e) {
+        // expected
+      }
       // check visibility with new xaction
       Transaction tx5 = txManager.startShort();
       Assert.assertTrue(tx5.isVisible(tx1.getTransactionId()));
@@ -252,11 +258,11 @@ public abstract class AbstractTransactionStateStorageTest {
       final byte[] b = { 'b' };
       // start a tx1, add a change A and commit
       Transaction tx1 = txManager.startShort();
-      Assert.assertTrue(txManager.canCommit(tx1, Collections.singleton(a)));
-      Assert.assertTrue(txManager.commit(tx1));
+      txManager.canCommit(tx1.getTransactionId(), Collections.singleton(a));
+      txManager.commit(tx1.getTransactionId(), tx1.getWritePointer());
       // start a tx2 and add a change B
       Transaction tx2 = txManager.startShort();
-      Assert.assertTrue(txManager.canCommit(tx2, Collections.singleton(b)));
+      txManager.canCommit(tx2.getTransactionId(), Collections.singleton(b));
       // start a tx3
       Transaction tx3 = txManager.startShort();
       TransactionSnapshot origState = txManager.getCurrentState();

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java b/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java
index 9c565ba..98d1148 100644
--- a/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java
@@ -364,8 +364,8 @@ public class SnapshotCodecTest {
     Assert.assertTrue(inProgressTx.getCheckpointWritePointers().isEmpty());
 
     // Should be able to commit the transaction
-    Assert.assertTrue(txManager.canCommit(checkpointTx, Collections.<byte[]>emptyList()));
-    Assert.assertTrue(txManager.commit(checkpointTx));
+    txManager.canCommit(checkpointTx.getTransactionId(), Collections.<byte[]>emptyList());
+    txManager.commit(checkpointTx.getTransactionId(), checkpointTx.getWritePointer());
 
     // save a new snapshot
     txManager.stopAndWait();

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
index 3c7d1e2..9615d8e 100644
--- a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
+++ b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
@@ -58,7 +58,6 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.tephra.ChangeId;
 import org.apache.tephra.Transaction;
 import org.apache.tephra.TransactionManager;
 import org.apache.tephra.TxConstants;
@@ -89,7 +88,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
@@ -151,7 +149,7 @@ public class TransactionProcessorTest {
         // this will set visibility upper bound to V[6]
         Maps.newTreeMap(ImmutableSortedMap.of(V[6], new TransactionManager.InProgressTx(
           V[6] - 1, Long.MAX_VALUE, TransactionManager.InProgressType.SHORT))),
-        new HashMap<Long, Set<ChangeId>>(), new TreeMap<Long, Set<ChangeId>>());
+        new HashMap<Long, TransactionManager.ChangeSet>(), new TreeMap<Long, TransactionManager.ChangeSet>());
     txVisibilityState = new TransactionSnapshot(txSnapshot.getTimestamp(), txSnapshot.getReadPointer(),
                                                 txSnapshot.getWritePointer(), txSnapshot.getInvalid(),
                                                 txSnapshot.getInProgress());

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java
index d826bad..4d34ed9 100644
--- a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java
+++ b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java
@@ -37,7 +37,6 @@ import java.util.Map;
 import javax.annotation.Nullable;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 
 /**
  * HBase 0.96 specific test for filtering logic applied when reading data transactionally.
@@ -250,20 +249,20 @@ public class TransactionVisibilityFilterTest extends AbstractTransactionVisibili
      */
 
     Transaction tx1 = txManager.startShort();
-    assertTrue(txManager.canCommit(tx1, EMPTY_CHANGESET));
-    assertTrue(txManager.commit(tx1));
+    txManager.canCommit(tx1.getTransactionId(), EMPTY_CHANGESET);
+    txManager.commit(tx1.getTransactionId(), tx1.getWritePointer());
 
     Transaction tx2 = txManager.startShort();
-    assertTrue(txManager.canCommit(tx2, EMPTY_CHANGESET));
-    assertTrue(txManager.commit(tx2));
+    txManager.canCommit(tx2.getTransactionId(), EMPTY_CHANGESET);
+    txManager.commit(tx2.getTransactionId(), tx2.getWritePointer());
 
     Transaction tx3 = txManager.startShort();
     Transaction tx4 = txManager.startShort();
     txManager.invalidate(tx4.getTransactionId());
 
     Transaction tx5 = txManager.startShort();
-    assertTrue(txManager.canCommit(tx5, EMPTY_CHANGESET));
-    assertTrue(txManager.commit(tx5));
+    txManager.canCommit(tx5.getTransactionId(), EMPTY_CHANGESET);
+    txManager.commit(tx5.getTransactionId(), tx5.getWritePointer());
 
     Transaction tx6 = txManager.startShort();
 

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
index b8e051b..dcb8314 100644
--- a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
+++ b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
@@ -64,7 +64,6 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.tephra.ChangeId;
 import org.apache.tephra.Transaction;
 import org.apache.tephra.TransactionManager;
 import org.apache.tephra.TxConstants;
@@ -95,7 +94,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
@@ -156,7 +154,7 @@ public class TransactionProcessorTest {
       // this will set visibility upper bound to V[6]
       Maps.newTreeMap(ImmutableSortedMap.of(V[6], new TransactionManager.InProgressTx(
         V[6] - 1, Long.MAX_VALUE, TransactionManager.InProgressType.SHORT))),
-      new HashMap<Long, Set<ChangeId>>(), new TreeMap<Long, Set<ChangeId>>());
+      new HashMap<Long, TransactionManager.ChangeSet>(), new TreeMap<Long, TransactionManager.ChangeSet>());
     txVisibilityState = new TransactionSnapshot(txSnapshot.getTimestamp(), txSnapshot.getReadPointer(),
                                                 txSnapshot.getWritePointer(), txSnapshot.getInvalid(),
                                                 txSnapshot.getInProgress());

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java
index 7a57aac..3352eef 100644
--- a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java
+++ b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java
@@ -37,7 +37,6 @@ import java.util.Map;
 import javax.annotation.Nullable;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 
 /**
  * HBase 0.98 specific test for filtering logic applied when reading data transactionally.
@@ -249,20 +248,20 @@ public class TransactionVisibilityFilterTest extends AbstractTransactionVisibili
      */
 
     Transaction tx1 = txManager.startShort();
-    assertTrue(txManager.canCommit(tx1, EMPTY_CHANGESET));
-    assertTrue(txManager.commit(tx1));
+    txManager.canCommit(tx1.getTransactionId(), EMPTY_CHANGESET);
+    txManager.commit(tx1.getTransactionId(), tx1.getWritePointer());
 
     Transaction tx2 = txManager.startShort();
-    assertTrue(txManager.canCommit(tx2, EMPTY_CHANGESET));
-    assertTrue(txManager.commit(tx2));
+    txManager.canCommit(tx2.getTransactionId(), EMPTY_CHANGESET);
+    txManager.commit(tx2.getTransactionId(), tx2.getWritePointer());
 
     Transaction tx3 = txManager.startShort();
     Transaction tx4 = txManager.startShort();
     txManager.invalidate(tx4.getTransactionId());
 
     Transaction tx5 = txManager.startShort();
-    assertTrue(txManager.canCommit(tx5, EMPTY_CHANGESET));
-    assertTrue(txManager.commit(tx5));
+    txManager.canCommit(tx5.getTransactionId(), EMPTY_CHANGESET);
+    txManager.commit(tx5.getTransactionId(), tx5.getWritePointer());
 
     Transaction tx6 = txManager.startShort();
 

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
index 9ce30b5..016adbd 100644
--- a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
+++ b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
@@ -47,7 +47,6 @@ import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.tephra.ChangeId;
 import org.apache.tephra.Transaction;
 import org.apache.tephra.TransactionManager;
 import org.apache.tephra.TxConstants;
@@ -77,7 +76,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 
@@ -136,7 +134,7 @@ public class TransactionProcessorTest {
       // this will set visibility upper bound to V[6]
       Maps.newTreeMap(ImmutableSortedMap.of(V[6], new TransactionManager.InProgressTx(
         V[6] - 1, Long.MAX_VALUE, TransactionManager.InProgressType.SHORT))),
-      new HashMap<Long, Set<ChangeId>>(), new TreeMap<Long, Set<ChangeId>>());
+      new HashMap<Long, TransactionManager.ChangeSet>(), new TreeMap<Long, TransactionManager.ChangeSet>());
     txVisibilityState = new TransactionSnapshot(txSnapshot.getTimestamp(), txSnapshot.getReadPointer(),
                                                 txSnapshot.getWritePointer(), txSnapshot.getInvalid(),
                                                 txSnapshot.getInProgress());

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java
index 5b9802d..c27a10d 100644
--- a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java
+++ b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java
@@ -37,7 +37,6 @@ import java.util.Map;
 import javax.annotation.Nullable;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 
 /**
  * HBase 1.0 (CDH) specific test for filtering logic applied when reading data transactionally.
@@ -249,20 +248,20 @@ public class TransactionVisibilityFilterTest extends AbstractTransactionVisibili
      */
 
     Transaction tx1 = txManager.startShort();
-    assertTrue(txManager.canCommit(tx1, EMPTY_CHANGESET));
-    assertTrue(txManager.commit(tx1));
+    txManager.canCommit(tx1.getTransactionId(), EMPTY_CHANGESET);
+    txManager.commit(tx1.getTransactionId(), tx1.getWritePointer());
 
     Transaction tx2 = txManager.startShort();
-    assertTrue(txManager.canCommit(tx2, EMPTY_CHANGESET));
-    assertTrue(txManager.commit(tx2));
+    txManager.canCommit(tx2.getTransactionId(), EMPTY_CHANGESET);
+    txManager.commit(tx2.getTransactionId(), tx2.getWritePointer());
 
     Transaction tx3 = txManager.startShort();
     Transaction tx4 = txManager.startShort();
     txManager.invalidate(tx4.getTransactionId());
 
     Transaction tx5 = txManager.startShort();
-    assertTrue(txManager.canCommit(tx5, EMPTY_CHANGESET));
-    assertTrue(txManager.commit(tx5));
+    txManager.canCommit(tx5.getTransactionId(), EMPTY_CHANGESET);
+    txManager.commit(tx5.getTransactionId(), tx5.getWritePointer());
 
     Transaction tx6 = txManager.startShort();
 

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
index 0ec3b46..5328aef 100644
--- a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
+++ b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
@@ -47,7 +47,6 @@ import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.tephra.ChangeId;
 import org.apache.tephra.Transaction;
 import org.apache.tephra.TransactionManager;
 import org.apache.tephra.TxConstants;
@@ -77,7 +76,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 
@@ -136,7 +134,7 @@ public class TransactionProcessorTest {
       // this will set visibility upper bound to V[6]
       Maps.newTreeMap(ImmutableSortedMap.of(V[6], new TransactionManager.InProgressTx(
         V[6] - 1, Long.MAX_VALUE, TransactionManager.InProgressType.SHORT))),
-      new HashMap<Long, Set<ChangeId>>(), new TreeMap<Long, Set<ChangeId>>());
+      new HashMap<Long, TransactionManager.ChangeSet>(), new TreeMap<Long, TransactionManager.ChangeSet>());
     txVisibilityState = new TransactionSnapshot(txSnapshot.getTimestamp(), txSnapshot.getReadPointer(),
                                                 txSnapshot.getWritePointer(), txSnapshot.getInvalid(),
                                                 txSnapshot.getInProgress());