You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by po...@apache.org on 2016/05/11 20:15:28 UTC

[28/56] [abbrv] [partial] incubator-tephra git commit: Rename package to org.apache.tephra

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/persist/TransactionEditTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/co/cask/tephra/persist/TransactionEditTest.java b/tephra-core/src/test/java/co/cask/tephra/persist/TransactionEditTest.java
deleted file mode 100644
index 4a3af3c..0000000
--- a/tephra-core/src/test/java/co/cask/tephra/persist/TransactionEditTest.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.persist;
-
-import co.cask.tephra.ChangeId;
-import co.cask.tephra.TransactionType;
-import com.google.common.collect.Sets;
-import com.google.common.io.ByteArrayDataOutput;
-import com.google.common.io.ByteStreams;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.DataInput;
-import java.io.IOException;
-
-/**
- * test for {@link TransactionEdit}
- */
-public class TransactionEditTest {
-  private static final byte[] COL = new byte[] {'c'};
-
-  @Test
-  public void testV1SerdeCompat() throws Exception {
-    TransactionEditCodecs.TransactionEditCodec olderCodec = new TransactionEditCodecs.TransactionEditCodecV1();
-    // start tx edit and committed tx edit cover all fields of tx edit
-    // NOTE: set visibilityUpperBound to 0 and transaction type to null as this is expected default 
-    // for decoding older versions that doesn't store it
-    verifyDecodingSupportsOlderVersion(TransactionEdit.createStarted(2L, 0L, 1000L, null), olderCodec);
-    verifyDecodingSupportsOlderVersion(
-      TransactionEdit.createCommitted(2L, Sets.newHashSet(new ChangeId(COL)), 3L, true), olderCodec);
-  }
-  
-  @Test
-  public void testV2SerdeCompat() throws Exception {
-    TransactionEditCodecs.TransactionEditCodec olderCodec = new TransactionEditCodecs.TransactionEditCodecV2();
-    // start tx edit and committed tx edit cover all fields of tx edit
-    // NOTE: transaction type to null as this is expected default for decoding older versions that doesn't store it
-    verifyDecodingSupportsOlderVersion(TransactionEdit.createStarted(2L, 100L, 1000L, null), olderCodec);
-    verifyDecodingSupportsOlderVersion(
-      TransactionEdit.createCommitted(2L, Sets.newHashSet(new ChangeId(COL)), 3L, true), olderCodec);
-  }
-
-  @SuppressWarnings("deprecation")
-  private void verifyDecodingSupportsOlderVersion(TransactionEdit edit, 
-                                                  TransactionEditCodecs.TransactionEditCodec olderCodec)
-    throws IOException {
-    // encoding with older version of codec
-    ByteArrayDataOutput out = ByteStreams.newDataOutput();
-    TransactionEditCodecs.encode(edit, out, olderCodec);
-
-    // decoding
-    TransactionEdit decodedEdit = new TransactionEdit();
-    DataInput in = ByteStreams.newDataInput(out.toByteArray());
-    decodedEdit.readFields(in);
-
-    Assert.assertEquals(edit, decodedEdit);
-  }
-
-  @Test
-  public void testSerialization() throws Exception {
-    assertSerializedEdit(TransactionEdit.createAborted(1L, TransactionType.SHORT, new long[0]));
-    assertSerializedEdit(TransactionEdit.createAborted(1L, TransactionType.SHORT, new long[]{ 2L, 3L }));
-    assertSerializedEdit(TransactionEdit.createAborted(1L, TransactionType.LONG, new long[0]));
-    assertSerializedEdit(TransactionEdit.createAborted(1L, TransactionType.LONG, new long[]{ 2L, 3L }));
-
-    assertSerializedEdit(TransactionEdit.createCheckpoint(2L, 1L));
-
-    assertSerializedEdit(TransactionEdit.createCommitted(1L, Sets.<ChangeId>newHashSet(), 2L, false));
-    assertSerializedEdit(TransactionEdit.createCommitted(1L, Sets.<ChangeId>newHashSet(), 2L, true));
-    assertSerializedEdit(TransactionEdit.createCommitted(1L,
-        Sets.newHashSet(new ChangeId(new byte[]{'a', 'b', 'c'})), 2L, false));
-    assertSerializedEdit(TransactionEdit.createCommitted(1L,
-        Sets.newHashSet(new ChangeId(new byte[]{ 'a', 'b', 'c' }), new ChangeId(new byte[]{ 'd', 'e', 'f' })),
-        2L, true));
-
-    assertSerializedEdit(TransactionEdit.createCommitting(1L, Sets.<ChangeId>newHashSet()));
-    assertSerializedEdit(TransactionEdit.createCommitting(1L,
-        Sets.newHashSet(new ChangeId(new byte[]{'a', 'b', 'c'}))));
-    assertSerializedEdit(TransactionEdit.createCommitting(1L,
-        Sets.newHashSet(new ChangeId(new byte[]{'a', 'b', 'c'}), new ChangeId(new byte[]{'d', 'e', 'f'}))));
-
-    assertSerializedEdit(TransactionEdit.createInvalid(1L));
-
-    assertSerializedEdit(TransactionEdit.createMoveWatermark(10L));
-
-    assertSerializedEdit(TransactionEdit.createStarted(2L, 1L, System.currentTimeMillis() + 1000,
-        TransactionType.SHORT));
-    assertSerializedEdit(TransactionEdit.createStarted(2L, 1L, System.currentTimeMillis() + 10000,
-        TransactionType.LONG));
-
-    assertSerializedEdit(TransactionEdit.createTruncateInvalidTx(Sets.newHashSet(new Long(1))));
-    assertSerializedEdit(TransactionEdit.createTruncateInvalidTx(
-        Sets.newHashSet(new Long(1), new Long(2), new Long(3))));
-
-    assertSerializedEdit(TransactionEdit.createTruncateInvalidTxBefore(System.currentTimeMillis()));
-  }
-
-  private void assertSerializedEdit(TransactionEdit originalEdit) throws IOException {
-    ByteArrayDataOutput out = ByteStreams.newDataOutput();
-    originalEdit.write(out);
-
-    TransactionEdit decodedEdit = new TransactionEdit();
-    DataInput in = ByteStreams.newDataInput(out.toByteArray());
-    decodedEdit.readFields(in);
-
-    Assert.assertEquals(originalEdit, decodedEdit);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/snapshot/SnapshotCodecTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/co/cask/tephra/snapshot/SnapshotCodecTest.java b/tephra-core/src/test/java/co/cask/tephra/snapshot/SnapshotCodecTest.java
deleted file mode 100644
index f3fe2e1..0000000
--- a/tephra-core/src/test/java/co/cask/tephra/snapshot/SnapshotCodecTest.java
+++ /dev/null
@@ -1,368 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.snapshot;
-
-import co.cask.tephra.ChangeId;
-import co.cask.tephra.Transaction;
-import co.cask.tephra.TransactionManager;
-import co.cask.tephra.TransactionNotInProgressException;
-import co.cask.tephra.TransactionType;
-import co.cask.tephra.TxConstants;
-import co.cask.tephra.persist.TransactionSnapshot;
-import co.cask.tephra.persist.TransactionStateStorage;
-import co.cask.tephra.persist.TransactionVisibilityState;
-import co.cask.tephra.runtime.ConfigModule;
-import co.cask.tephra.runtime.DiscoveryModules;
-import co.cask.tephra.runtime.TransactionModules;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSortedMap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-import org.apache.hadoop.conf.Configuration;
-import org.junit.Assert;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Tests related to {@link SnapshotCodec} implementations.
- */
-public class SnapshotCodecTest {
-  @ClassRule
-  public static TemporaryFolder tmpDir = new TemporaryFolder();
-
-  @Test
-  public void testMinimalDeserilization() throws Exception {
-    long now = System.currentTimeMillis();
-    long nowWritePointer = now * TxConstants.MAX_TX_PER_MS;
-    /*
-     * Snapshot consisting of transactions at:
-     */
-    long tInvalid = nowWritePointer - 5;    // t1 - invalid
-    long readPtr = nowWritePointer - 4;     // t2 - here and earlier committed
-    long tLong = nowWritePointer - 3;       // t3 - in-progress LONG
-    long tCommitted = nowWritePointer - 2;  // t4 - committed, changeset (r1, r2)
-    long tShort = nowWritePointer - 1;      // t5 - in-progress SHORT, canCommit called, changeset (r3, r4)
-
-    TreeMap<Long, TransactionManager.InProgressTx> inProgress = Maps.newTreeMap(ImmutableSortedMap.of(
-      tLong, new TransactionManager.InProgressTx(readPtr,
-                                                 TransactionManager.getTxExpirationFromWritePointer(
-                                                   tLong, TxConstants.Manager.DEFAULT_TX_LONG_TIMEOUT),
-                                                 TransactionType.LONG),
-      tShort, new TransactionManager.InProgressTx(readPtr, now + 1000, TransactionType.SHORT)));
-
-    TransactionSnapshot snapshot = new TransactionSnapshot(now, readPtr, nowWritePointer,
-                                                           Lists.newArrayList(tInvalid), // invalid
-                                                           inProgress, ImmutableMap.<Long, Set<ChangeId>>of(
-                                                             tShort, Sets.<ChangeId>newHashSet()),
-                                                           ImmutableMap.<Long, Set<ChangeId>>of(
-                                                             tCommitted, Sets.<ChangeId>newHashSet()));
-
-    Configuration conf1 = new Configuration();
-    conf1.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, SnapshotCodecV4.class.getName());
-    SnapshotCodecProvider provider1 = new SnapshotCodecProvider(conf1);
-
-    byte[] byteArray;
-    try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
-      provider1.encode(out, snapshot);
-      byteArray = out.toByteArray();
-    }
-
-    // TransactionSnapshot and TransactionVisibilityState decode should pass now
-    TransactionSnapshot txSnapshot = provider1.decode(new ByteArrayInputStream(byteArray));
-    TransactionVisibilityState txVisibilityState =
-      provider1.decodeTransactionVisibilityState(new ByteArrayInputStream(byteArray));
-    assertTransactionVisibilityStateEquals(txSnapshot, txVisibilityState);
-
-    // Corrupt the serialization byte array so that full deserialization will fail
-    byteArray[byteArray.length - 1] = 'a';
-
-    // TransactionVisibilityState decoding should pass since it doesn't decode the committing and committed changesets.
-    TransactionVisibilityState txVisibilityState2 = provider1.decodeTransactionVisibilityState(
-      new ByteArrayInputStream(byteArray));
-    Assert.assertNotNull(txVisibilityState2);
-    Assert.assertEquals(txVisibilityState, txVisibilityState2);
-    Assert.assertEquals(readPtr, txVisibilityState2.getReadPointer());
-    try {
-      provider1.decode(new ByteArrayInputStream(byteArray));
-      Assert.fail();
-    } catch (RuntimeException e) {
-      // expected since we modified the serialization bytes
-    }
-  }
-
-  /**
-   * In-progress LONG transactions written with DefaultSnapshotCodec will not have the type serialized as part of
-   * the data.  Since these transactions also contain a non-negative expiration, we need to ensure we reset the type
-   * correctly when the snapshot is loaded.
-   */
-  @Test
-  public void testDefaultToV3Compatibility() throws Exception {
-    long now = System.currentTimeMillis();
-    long nowWritePointer = now * TxConstants.MAX_TX_PER_MS;
-    /*
-     * Snapshot consisting of transactions at:
-     */
-    long tInvalid = nowWritePointer - 5;    // t1 - invalid
-    long readPtr = nowWritePointer - 4;     // t2 - here and earlier committed
-    long tLong = nowWritePointer - 3;       // t3 - in-progress LONG
-    long tCommitted = nowWritePointer - 2;  // t4 - committed, changeset (r1, r2)
-    long tShort = nowWritePointer - 1;      // t5 - in-progress SHORT, canCommit called, changeset (r3, r4)
-
-    TreeMap<Long, TransactionManager.InProgressTx> inProgress = Maps.newTreeMap(ImmutableSortedMap.of(
-        tLong, new TransactionManager.InProgressTx(readPtr,
-            TransactionManager.getTxExpirationFromWritePointer(tLong, TxConstants.Manager.DEFAULT_TX_LONG_TIMEOUT),
-            TransactionType.LONG),
-        tShort, new TransactionManager.InProgressTx(readPtr, now + 1000, TransactionType.SHORT)));
-
-    TransactionSnapshot snapshot = new TransactionSnapshot(now, readPtr, nowWritePointer,
-        Lists.newArrayList(tInvalid), // invalid
-        inProgress,
-        ImmutableMap.<Long, Set<ChangeId>>of(
-            tShort, Sets.newHashSet(new ChangeId(new byte[]{'r', '3'}), new ChangeId(new byte[]{'r', '4'}))),
-        ImmutableMap.<Long, Set<ChangeId>>of(
-            tCommitted, Sets.newHashSet(new ChangeId(new byte[]{'r', '1'}), new ChangeId(new byte[]{'r', '2'}))));
-
-    Configuration conf1 = new Configuration();
-    conf1.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, DefaultSnapshotCodec.class.getName());
-    SnapshotCodecProvider provider1 = new SnapshotCodecProvider(conf1);
-
-    ByteArrayOutputStream out = new ByteArrayOutputStream();
-    try {
-      provider1.encode(out, snapshot);
-    } finally {
-      out.close();
-    }
-
-    TransactionSnapshot snapshot2 = provider1.decode(new ByteArrayInputStream(out.toByteArray()));
-    TransactionVisibilityState minTxSnapshot = provider1.decodeTransactionVisibilityState(
-      new ByteArrayInputStream(out.toByteArray()));
-    assertTransactionVisibilityStateEquals(snapshot2, minTxSnapshot);
-
-    assertEquals(snapshot.getReadPointer(), snapshot2.getReadPointer());
-    assertEquals(snapshot.getWritePointer(), snapshot2.getWritePointer());
-    assertEquals(snapshot.getInvalid(), snapshot2.getInvalid());
-    // in-progress transactions will have missing types
-    assertNotEquals(snapshot.getInProgress(), snapshot2.getInProgress());
-    assertEquals(snapshot.getCommittingChangeSets(), snapshot2.getCommittingChangeSets());
-    assertEquals(snapshot.getCommittedChangeSets(), snapshot2.getCommittedChangeSets());
-
-    // after fixing in-progress, full snapshot should match
-    Map<Long, TransactionManager.InProgressTx> fixedInProgress = TransactionManager.txnBackwardsCompatCheck(
-        TxConstants.Manager.DEFAULT_TX_LONG_TIMEOUT, 10000L, snapshot2.getInProgress());
-    assertEquals(snapshot.getInProgress(), fixedInProgress);
-    assertEquals(snapshot, snapshot2);
-  }
-
-  /**
-   * Test full stack serialization for a TransactionManager migrating from DefaultSnapshotCodec to SnapshotCodecV3.
-   */
-  @Test
-  public void testDefaultToV3Migration() throws Exception {
-    File testDir = tmpDir.newFolder("testDefaultToV3Migration");
-    Configuration conf = new Configuration();
-    conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, DefaultSnapshotCodec.class.getName());
-    conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_LOCAL_DIR, testDir.getAbsolutePath());
-
-    Injector injector = Guice.createInjector(new ConfigModule(conf),
-        new DiscoveryModules().getSingleNodeModules(), new TransactionModules().getSingleNodeModules());
-
-    TransactionManager txManager = injector.getInstance(TransactionManager.class);
-    txManager.startAndWait();
-
-    txManager.startLong();
-
-    // shutdown to force a snapshot
-    txManager.stopAndWait();
-
-    TransactionStateStorage txStorage = injector.getInstance(TransactionStateStorage.class);
-    txStorage.startAndWait();
-
-    // confirm that the in-progress entry is missing a type
-    TransactionSnapshot snapshot = txStorage.getLatestSnapshot();
-    TransactionVisibilityState txVisibilityState = txStorage.getLatestTransactionVisibilityState();
-    assertTransactionVisibilityStateEquals(snapshot, txVisibilityState);
-    assertNotNull(snapshot);
-    assertEquals(1, snapshot.getInProgress().size());
-    Map.Entry<Long, TransactionManager.InProgressTx> entry =
-        snapshot.getInProgress().entrySet().iterator().next();
-    assertNull(entry.getValue().getType());
-    txStorage.stopAndWait();
-
-
-    // start a new Tx manager to test fixup
-    Configuration conf2 = new Configuration();
-    conf2.set(TxConstants.Manager.CFG_TX_SNAPSHOT_LOCAL_DIR, testDir.getAbsolutePath());
-    conf2.setStrings(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES,
-                     DefaultSnapshotCodec.class.getName(), SnapshotCodecV3.class.getName());
-    Injector injector2 = Guice.createInjector(new ConfigModule(conf2),
-        new DiscoveryModules().getSingleNodeModules(), new TransactionModules().getSingleNodeModules());
-
-    TransactionManager txManager2 = injector2.getInstance(TransactionManager.class);
-    txManager2.startAndWait();
-
-    // state should be recovered
-    TransactionSnapshot snapshot2 = txManager2.getCurrentState();
-    assertEquals(1, snapshot2.getInProgress().size());
-    Map.Entry<Long, TransactionManager.InProgressTx> inProgressTx =
-        snapshot2.getInProgress().entrySet().iterator().next();
-    assertEquals(TransactionType.LONG, inProgressTx.getValue().getType());
-
-    // save a new snapshot
-    txManager2.stopAndWait();
-
-    TransactionStateStorage txStorage2 = injector2.getInstance(TransactionStateStorage.class);
-    txStorage2.startAndWait();
-
-    TransactionSnapshot snapshot3 = txStorage2.getLatestSnapshot();
-    // full snapshot should have deserialized correctly without any fixups
-    assertEquals(snapshot2.getInProgress(), snapshot3.getInProgress());
-    assertEquals(snapshot2, snapshot3);
-    txStorage2.stopAndWait();
-  }
-
-  @Test
-  public void testSnapshotCodecProviderConfiguration() throws Exception {
-    Configuration conf = new Configuration(false);
-    StringBuilder buf = new StringBuilder();
-    for (Class c : TxConstants.Persist.DEFAULT_TX_SNAPHOT_CODEC_CLASSES) {
-      if (buf.length() > 0) {
-        buf.append(",\n    ");
-      }
-      buf.append(c.getName());
-    }
-    conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, buf.toString());
-
-    SnapshotCodecProvider codecProvider = new SnapshotCodecProvider(conf);
-    SnapshotCodec v1codec = codecProvider.getCodecForVersion(new DefaultSnapshotCodec().getVersion());
-    assertNotNull(v1codec);
-    assertTrue(v1codec instanceof DefaultSnapshotCodec);
-
-    SnapshotCodec v2codec = codecProvider.getCodecForVersion(new SnapshotCodecV2().getVersion());
-    assertNotNull(v2codec);
-    assertTrue(v2codec instanceof SnapshotCodecV2);
-
-    SnapshotCodec v3codec = codecProvider.getCodecForVersion(new SnapshotCodecV3().getVersion());
-    assertNotNull(v3codec);
-    assertTrue(v3codec instanceof SnapshotCodecV3);
-
-    SnapshotCodec v4codec = codecProvider.getCodecForVersion(new SnapshotCodecV4().getVersion());
-    assertNotNull(v4codec);
-    assertTrue(v4codec instanceof SnapshotCodecV4);
-  }
-
-  @Test
-  public void testSnapshotCodecV4() throws IOException, TransactionNotInProgressException {
-    File testDir = tmpDir.newFolder("testSnapshotCodecV4");
-    Configuration conf = new Configuration();
-    conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, SnapshotCodecV4.class.getName());
-    conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_LOCAL_DIR, testDir.getAbsolutePath());
-
-    Injector injector = Guice.createInjector(new ConfigModule(conf),
-                                             new DiscoveryModules().getSingleNodeModules(),
-                                             new TransactionModules().getSingleNodeModules());
-
-    TransactionManager txManager = injector.getInstance(TransactionManager.class);
-    txManager.startAndWait();
-
-    // Create a transaction and a checkpoint transaction
-    Transaction transaction = txManager.startLong();
-    Transaction checkpointTx = txManager.checkpoint(transaction);
-
-    // shutdown to force a snapshot
-    txManager.stopAndWait();
-
-    // Validate the snapshot on disk
-    TransactionStateStorage txStorage = injector.getInstance(TransactionStateStorage.class);
-    txStorage.startAndWait();
-
-    TransactionSnapshot snapshot = txStorage.getLatestSnapshot();
-    TransactionVisibilityState txVisibilityState = txStorage.getLatestTransactionVisibilityState();
-    assertTransactionVisibilityStateEquals(snapshot, txVisibilityState);
-
-    Map<Long, TransactionManager.InProgressTx> inProgress = snapshot.getInProgress();
-    Assert.assertEquals(1, inProgress.size());
-
-    TransactionManager.InProgressTx inProgressTx = inProgress.get(transaction.getTransactionId());
-    Assert.assertNotNull(inProgressTx);
-    Assert.assertArrayEquals(checkpointTx.getCheckpointWritePointers(),
-                             inProgressTx.getCheckpointWritePointers().toLongArray());
-
-    txStorage.stopAndWait();
-
-    // start a new Tx manager to see if the transaction is restored correctly.
-    Injector injector2 = Guice.createInjector(new ConfigModule(conf),
-                                              new DiscoveryModules().getSingleNodeModules(),
-                                              new TransactionModules().getSingleNodeModules());
-
-    txManager = injector2.getInstance(TransactionManager.class);
-    txManager.startAndWait();
-
-    // state should be recovered
-    snapshot = txManager.getCurrentState();
-    inProgress = snapshot.getInProgress();
-    Assert.assertEquals(1, inProgress.size());
-
-    inProgressTx = inProgress.get(transaction.getTransactionId());
-    Assert.assertNotNull(inProgressTx);
-    Assert.assertArrayEquals(checkpointTx.getCheckpointWritePointers(),
-                             inProgressTx.getCheckpointWritePointers().toLongArray());
-
-    // Should be able to commit the transaction
-    Assert.assertTrue(txManager.canCommit(checkpointTx, Collections.<byte[]>emptyList()));
-    Assert.assertTrue(txManager.commit(checkpointTx));
-
-    // save a new snapshot
-    txManager.stopAndWait();
-
-    TransactionStateStorage txStorage2 = injector2.getInstance(TransactionStateStorage.class);
-    txStorage2.startAndWait();
-
-    snapshot = txStorage2.getLatestSnapshot();
-    Assert.assertTrue(snapshot.getInProgress().isEmpty());
-    txStorage2.stopAndWait();
-  }
-
-  private void assertTransactionVisibilityStateEquals(TransactionVisibilityState expected,
-                                                      TransactionVisibilityState input) {
-    Assert.assertEquals(expected.getTimestamp(), input.getTimestamp());
-    Assert.assertEquals(expected.getReadPointer(), input.getReadPointer());
-    Assert.assertEquals(expected.getWritePointer(), input.getWritePointer());
-    Assert.assertEquals(expected.getInProgress(), input.getInProgress());
-    Assert.assertEquals(expected.getInvalid(), input.getInvalid());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/util/AbstractConfigurationProviderTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/co/cask/tephra/util/AbstractConfigurationProviderTest.java b/tephra-core/src/test/java/co/cask/tephra/util/AbstractConfigurationProviderTest.java
deleted file mode 100644
index 0ae9ed4..0000000
--- a/tephra-core/src/test/java/co/cask/tephra/util/AbstractConfigurationProviderTest.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.util;
-
-import org.apache.hadoop.conf.Configuration;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-/**
- *
- */
-public abstract class AbstractConfigurationProviderTest {
-  @Test
-  public void testVersionFactory() {
-    HBaseVersion.Version foundVersion = HBaseVersion.get();
-    assertEquals(getExpectedVersion(), foundVersion);
-  }
-
-  protected abstract HBaseVersion.Version getExpectedVersion();
-
-  @Test
-  public void testConfigurationProvider() {
-    Configuration conf = new Configuration();
-    conf.set("foo", "bar");
-    Configuration newConf = new ConfigurationFactory().get(conf);
-    assertNotNull(newConf);
-    assertEquals("bar", newConf.get("foo"));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/util/HBaseVersionTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/co/cask/tephra/util/HBaseVersionTest.java b/tephra-core/src/test/java/co/cask/tephra/util/HBaseVersionTest.java
deleted file mode 100644
index 1bb1fe2..0000000
--- a/tephra-core/src/test/java/co/cask/tephra/util/HBaseVersionTest.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.util;
-
-import org.junit.Test;
-
-import java.text.ParseException;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
-
-/**
- * Tests for HBase version parsing.
- */
-public class HBaseVersionTest {
-  @Test
-  public void testVersionNumber() throws Exception {
-    HBaseVersion.VersionNumber ver = HBaseVersion.VersionNumber.create("1");
-    assertVersionNumber(ver, 1, null, null, null, false);
-
-    ver = HBaseVersion.VersionNumber.create("1-SNAPSHOT");
-    assertVersionNumber(ver, 1, null, null, null, true);
-
-    ver = HBaseVersion.VersionNumber.create("1-foo");
-    assertVersionNumber(ver, 1, null, null, "foo", false);
-
-    ver = HBaseVersion.VersionNumber.create("1-foo-SNAPSHOT");
-    assertVersionNumber(ver, 1, null, null, "foo", true);
-
-    ver = HBaseVersion.VersionNumber.create("10.0");
-    assertVersionNumber(ver, 10, 0, null, null, false);
-
-    ver = HBaseVersion.VersionNumber.create("10.0-bar");
-    assertVersionNumber(ver, 10, 0, null, "bar", false);
-
-    ver = HBaseVersion.VersionNumber.create("3.2.1");
-    assertVersionNumber(ver, 3, 2, 1, null, false);
-
-    ver = HBaseVersion.VersionNumber.create("3.2.1-SNAPSHOT");
-    assertVersionNumber(ver, 3, 2, 1, null, true);
-
-    ver = HBaseVersion.VersionNumber.create("3.2.1-baz");
-    assertVersionNumber(ver, 3, 2, 1, "baz", false);
-
-    ver = HBaseVersion.VersionNumber.create("3.2.1-baz1.2.3");
-    assertVersionNumber(ver, 3, 2, 1, "baz1.2.3", false);
-
-    ver = HBaseVersion.VersionNumber.create("3.2.1-baz1.2.3-SNAPSHOT");
-    assertVersionNumber(ver, 3, 2, 1, "baz1.2.3", true);
-
-    try {
-      ver = HBaseVersion.VersionNumber.create("abc");
-      fail("Invalid verison number 'abc' should have thrown a ParseException");
-    } catch (ParseException pe) {
-      // expected
-    }
-
-    try {
-      ver = HBaseVersion.VersionNumber.create("1.a.b");
-      fail("Invalid verison number '1.a.b' should have thrown a ParseException");
-    } catch (ParseException pe) {
-      // expected
-    }
-
-    ver = HBaseVersion.VersionNumber.create("1.2.0-CDH5.7.0");
-    assertVersionNumber(ver, 1, 2, 0, "CDH5.7.0", false);
-  }
-
-  private void assertVersionNumber(HBaseVersion.VersionNumber version, Integer expectedMajor, Integer expectedMinor,
-                                   Integer expectedPatch, String expectedClassifier, boolean snapshot) {
-    if (expectedMajor == null) {
-      assertNull(version.getMajor());
-    } else {
-      assertEquals(expectedMajor, version.getMajor());
-    }
-    if (expectedMinor == null) {
-      assertNull(version.getMinor());
-    } else {
-      assertEquals(expectedMinor, version.getMinor());
-    }
-    if (expectedPatch == null) {
-      assertNull(version.getPatch());
-    } else {
-      assertEquals(expectedPatch, version.getPatch());
-    }
-    if (expectedClassifier == null) {
-      assertNull(version.getClassifier());
-    } else {
-      assertEquals(expectedClassifier, version.getClassifier());
-    }
-    assertEquals(snapshot, version.isSnapshot());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/util/TransactionEditUtil.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/co/cask/tephra/util/TransactionEditUtil.java b/tephra-core/src/test/java/co/cask/tephra/util/TransactionEditUtil.java
deleted file mode 100644
index c3269b0..0000000
--- a/tephra-core/src/test/java/co/cask/tephra/util/TransactionEditUtil.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.util;
-
-import co.cask.tephra.ChangeId;
-import co.cask.tephra.TransactionType;
-import co.cask.tephra.persist.TransactionEdit;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-import java.util.List;
-import java.util.Random;
-import java.util.Set;
-
-/**
- * Util class for {@link TransactionEdit} related tests.
- */
-public final class TransactionEditUtil {
-  private static Random random = new Random();
-
-  /**
-   * Generates a number of semi-random {@link TransactionEdit} instances.
-   * These are just randomly selected from the possible states, so would not necessarily reflect a real-world
-   * distribution.
-   *
-   * @param numEntries how many entries to generate in the returned list.
-   * @return a list of randomly generated transaction log edits.
-   */
-  public static List<TransactionEdit> createRandomEdits(int numEntries) {
-    List<TransactionEdit> edits = Lists.newArrayListWithCapacity(numEntries);
-    for (int i = 0; i < numEntries; i++) {
-      TransactionEdit.State nextType = TransactionEdit.State.values()[random.nextInt(6)];
-      long writePointer = Math.abs(random.nextLong());
-      switch (nextType) {
-        case INPROGRESS:
-          edits.add(
-            TransactionEdit.createStarted(writePointer, writePointer - 1,
-                                          System.currentTimeMillis() + 300000L, TransactionType.SHORT));
-          break;
-        case COMMITTING:
-          edits.add(TransactionEdit.createCommitting(writePointer, generateChangeSet(10)));
-          break;
-        case COMMITTED:
-          edits.add(TransactionEdit.createCommitted(writePointer, generateChangeSet(10), writePointer + 1,
-                                                    random.nextBoolean()));
-          break;
-        case INVALID:
-          edits.add(TransactionEdit.createInvalid(writePointer));
-          break;
-        case ABORTED:
-          edits.add(TransactionEdit.createAborted(writePointer, TransactionType.SHORT, null));
-          break;
-        case MOVE_WATERMARK:
-          edits.add(TransactionEdit.createMoveWatermark(writePointer));
-          break;
-      }
-    }
-    return edits;
-  }
-
-  private static Set<ChangeId> generateChangeSet(int numEntries) {
-    Set<ChangeId> changes = Sets.newHashSet();
-    for (int i = 0; i < numEntries; i++) {
-      byte[] bytes = new byte[8];
-      random.nextBytes(bytes);
-      changes.add(new ChangeId(bytes));
-    }
-    return changes;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/util/TxUtilsTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/co/cask/tephra/util/TxUtilsTest.java b/tephra-core/src/test/java/co/cask/tephra/util/TxUtilsTest.java
deleted file mode 100644
index b8188e0..0000000
--- a/tephra-core/src/test/java/co/cask/tephra/util/TxUtilsTest.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.util;
-
-import co.cask.tephra.Transaction;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Test cases for {@link TxUtils} utility methods.
- */
-public class TxUtilsTest {
-  @Test
-  public void testMaxVisibleTimestamp() {
-    // make sure we don't overflow with MAX_VALUE write pointer
-    assertEquals(Long.MAX_VALUE, TxUtils.getMaxVisibleTimestamp(Transaction.ALL_VISIBLE_LATEST));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/visibility/VisibilityFenceTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/co/cask/tephra/visibility/VisibilityFenceTest.java b/tephra-core/src/test/java/co/cask/tephra/visibility/VisibilityFenceTest.java
deleted file mode 100644
index 750fe28..0000000
--- a/tephra-core/src/test/java/co/cask/tephra/visibility/VisibilityFenceTest.java
+++ /dev/null
@@ -1,351 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.visibility;
-
-import co.cask.tephra.Transaction;
-import co.cask.tephra.TransactionAware;
-import co.cask.tephra.TransactionConflictException;
-import co.cask.tephra.TransactionContext;
-import co.cask.tephra.TransactionFailureException;
-import co.cask.tephra.TransactionManager;
-import co.cask.tephra.TransactionSystemClient;
-import co.cask.tephra.inmemory.InMemoryTxSystemClient;
-import co.cask.tephra.metrics.TxMetricsCollector;
-import co.cask.tephra.persist.InMemoryTransactionStateStorage;
-import com.google.common.base.Charsets;
-import com.google.common.base.Throwables;
-import org.apache.hadoop.conf.Configuration;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * The following are all the possible cases when using {@link VisibilityFence}.
- *
- * In the below table,
- * "Read Txn" refers to the transaction that contains the read fence
- * "Before Write", "During Write" and "After Write" refer to the write transaction time
- * "Before Write Fence", "During Write Fence", "After Write Fence" refer to the write fence transaction time
- *
- * Timeline is: Before Write < During Write < After Write < Before Write Fence < During Write Fence <
- *              After Write Fence
- *
- * +------+----------------------+----------------------+--------------------+--------------------+
- * | Case |    Read Txn Start    |   Read Txn Commit    | Conflict on Commit | Conflict on Commit |
- * |      |                      |                      | of Read Txn        | of Write Fence     |
- * +------+----------------------+----------------------+--------------------+--------------------+
- * |    1 | Before Write         | Before Write         | No                 | No                 |
- * |    2 | Before Write         | During Write         | No                 | No                 |
- * |    3 | Before Write         | After Write          | No                 | No                 |
- * |    4 | Before Write         | Before Write Fence   | No                 | No                 |
- * |    5 | Before Write         | During Write Fence   | No                 | Yes                |
- * |    6 | Before Write         | After Write Fence    | Yes                | No                 |
- * |      |                      |                      |                    |                    |
- * |    7 | During Write         | During Write         | No                 | No                 |
- * |    8 | During Write         | After Write          | No                 | No                 |
- * |    9 | During Write         | Before Write Fence   | No                 | No                 |
- * |   10 | During Write         | During Write Fence   | No                 | Yes                |
- * |   11 | During Write         | After Write Fence    | Yes                | No                 |
- * |      |                      |                      |                    |                    |
- * |   12 | After Write          | After Write          | No                 | No                 |
- * |   13 | After Write          | Before Write Fence   | No                 | No                 |
- * |   14 | After Write          | During Write Fence   | No                 | Yes #              |
- * |   15 | After Write          | After Write Fence    | Yes #              | No                 |
- * |      |                      |                      |                    |                    |
- * |   16 | Before Write Fence   | Before Write Fence   | No                 | No                 |
- * |   17 | Before Write Fence   | During Write Fence   | No                 | Yes #              |
- * |   18 | Before Write Fence   | After Write Fence    | Yes #              | No                 |
- * |      |                      |                      |                    |                    |
- * |   19 | During Write Fence   | During Write Fence   | No                 | No                 |
- * |   20 | During Write Fence   | After Write Fence    | No                 | No                 |
- * |      |                      |                      |                    |                    |
- * |   21 | After Write Fence    | After Write Fence    | No                 | No                 |
- * +------+----------------------+----------------------+--------------------+--------------------+
- *
- * Note: Cases marked with '#' in conflict column should not conflict, however current implementation causes
- * them to conflict. The remaining conflicts are a result of the fence.
- *
- * In the current implementation of VisibilityFence, read txns that start "Before Write", "During Write",
- * and "After Write" can be represented by read txns that start "Before Write Fence".
- * Verifying cases 16, 17, 18, 20 and 21 will effectively cover all other cases.
- */
-public class VisibilityFenceTest {
-  private static Configuration conf = new Configuration();
-
-  private static TransactionManager txManager = null;
-
-  @BeforeClass
-  public static void before() {
-    txManager = new TransactionManager(conf, new InMemoryTransactionStateStorage(), new TxMetricsCollector());
-    txManager.startAndWait();
-  }
-
-  @AfterClass
-  public static void after() {
-    txManager.stopAndWait();
-  }
-
-  @Test
-  public void testFence1() throws Exception {
-    byte[] fenceId = "test_table".getBytes(Charsets.UTF_8);
-
-    // Writer updates data here in a separate transaction (code not shown)
-    // start tx
-    // update
-    // commit tx
-
-    // Readers use fence to indicate that they are interested in changes to specific data
-    TransactionAware readFenceCase16 = VisibilityFence.create(fenceId);
-    TransactionContext readTxContextCase16 =
-      new TransactionContext(new InMemoryTxSystemClient(txManager), readFenceCase16);
-    readTxContextCase16.start();
-    readTxContextCase16.finish();
-
-    TransactionAware readFenceCase17 = VisibilityFence.create(fenceId);
-    TransactionContext readTxContextCase17 =
-      new TransactionContext(new InMemoryTxSystemClient(txManager), readFenceCase17);
-    readTxContextCase17.start();
-
-    TransactionAware readFenceCase18 = VisibilityFence.create(fenceId);
-    TransactionContext readTxContextCase18 =
-      new TransactionContext(new InMemoryTxSystemClient(txManager), readFenceCase18);
-    readTxContextCase18.start();
-
-    // Now writer needs to wait for in-progress readers to see the change, it uses write fence to do so
-    // Start write fence txn
-    TransactionAware writeFence = new WriteFence(fenceId);
-    TransactionContext writeTxContext = new TransactionContext(new InMemoryTxSystemClient(txManager), writeFence);
-    writeTxContext.start();
-
-    TransactionAware readFenceCase20 = VisibilityFence.create(fenceId);
-    TransactionContext readTxContextCase20 =
-      new TransactionContext(new InMemoryTxSystemClient(txManager), readFenceCase20);
-    readTxContextCase20.start();
-
-    readTxContextCase17.finish();
-
-    assertTxnConflict(writeTxContext);
-    writeTxContext.start();
-
-    // Commit write fence txn can commit without conflicts at this point
-    writeTxContext.finish();
-
-    TransactionAware readFenceCase21 = VisibilityFence.create(fenceId);
-    TransactionContext readTxContextCase21 =
-      new TransactionContext(new InMemoryTxSystemClient(txManager), readFenceCase21);
-    readTxContextCase21.start();
-
-    assertTxnConflict(readTxContextCase18);
-    readTxContextCase20.finish();
-    readTxContextCase21.finish();
-  }
-
-  private void assertTxnConflict(TransactionContext txContext) throws Exception {
-    try {
-      txContext.finish();
-      Assert.fail("Expected transaction to fail");
-    } catch (TransactionConflictException e) {
-      // Expected
-      txContext.abort();
-    }
-  }
-
-  @Test
-  public void testFence2() throws Exception {
-    byte[] fenceId = "test_table".getBytes(Charsets.UTF_8);
-
-    // Readers use fence to indicate that they are interested in changes to specific data
-    // Reader 1
-    TransactionAware readFence1 = VisibilityFence.create(fenceId);
-    TransactionContext readTxContext1 = new TransactionContext(new InMemoryTxSystemClient(txManager), readFence1);
-    readTxContext1.start();
-
-    // Reader 2
-    TransactionAware readFence2 = VisibilityFence.create(fenceId);
-    TransactionContext readTxContext2 = new TransactionContext(new InMemoryTxSystemClient(txManager), readFence2);
-    readTxContext2.start();
-
-    // Reader 3
-    TransactionAware readFence3 = VisibilityFence.create(fenceId);
-    TransactionContext readTxContext3 = new TransactionContext(new InMemoryTxSystemClient(txManager), readFence3);
-    readTxContext3.start();
-
-    // Writer updates data here in a separate transaction (code not shown)
-    // start tx
-    // update
-    // commit tx
-
-    // Now writer needs to wait for readers 1, 2, and 3 to see the change, it uses write fence to do so
-    TransactionAware writeFence = new WriteFence(fenceId);
-    TransactionContext writeTxContext = new TransactionContext(new InMemoryTxSystemClient(txManager), writeFence);
-    writeTxContext.start();
-
-    // Reader 1 commits before writeFence is committed
-    readTxContext1.finish();
-
-    try {
-      // writeFence will throw exception since Reader 1 committed without seeing changes
-      writeTxContext.finish();
-      Assert.fail("Expected transaction to fail");
-    } catch (TransactionConflictException e) {
-      // Expected
-      writeTxContext.abort();
-    }
-
-    // Start over writeFence again
-    writeTxContext.start();
-
-    // Now, Reader 3 commits before writeFence
-    // Note that Reader 3 does not conflict with Reader 1
-    readTxContext3.finish();
-
-    try {
-      // writeFence will throw exception again since Reader 3 committed without seeing changes
-      writeTxContext.finish();
-      Assert.fail("Expected transaction to fail");
-    } catch (TransactionConflictException e) {
-      // Expected
-      writeTxContext.abort();
-    }
-
-    // Start over writeFence again
-    writeTxContext.start();
-    // This time writeFence commits before the other readers
-    writeTxContext.finish();
-
-    // After this point all readers will see the change
-
-    try {
-      // Reader 2 commits after writeFence, hence this commit with throw exception
-      readTxContext2.finish();
-      Assert.fail("Expected transaction to fail");
-    } catch (TransactionConflictException e) {
-      // Expected
-      readTxContext2.abort();
-    }
-
-    // Reader 2 has to abort and start over again. It will see the changes now.
-    readTxContext2 = new TransactionContext(new InMemoryTxSystemClient(txManager), readFence2);
-    readTxContext2.start();
-    readTxContext2.finish();
-  }
-
-  @Test
-  public void testFenceAwait() throws Exception {
-    byte[] fenceId = "test_table".getBytes(Charsets.UTF_8);
-
-    final TransactionContext fence1 = new TransactionContext(new InMemoryTxSystemClient(txManager),
-                                                       VisibilityFence.create(fenceId));
-    fence1.start();
-    final TransactionContext fence2 = new TransactionContext(new InMemoryTxSystemClient(txManager),
-                                                       VisibilityFence.create(fenceId));
-    fence2.start();
-    TransactionContext fence3 = new TransactionContext(new InMemoryTxSystemClient(txManager),
-                                                       VisibilityFence.create(fenceId));
-    fence3.start();
-
-    final AtomicInteger attempts = new AtomicInteger();
-    TransactionSystemClient customTxClient = new InMemoryTxSystemClient(txManager) {
-      @Override
-      public Transaction startShort() {
-        Transaction transaction = super.startShort();
-        try {
-          switch (attempts.getAndIncrement()) {
-            case 0:
-              fence1.finish();
-              break;
-            case 1:
-              fence2.finish();
-              break;
-            case 2:
-              break;
-            default:
-              throw new IllegalStateException("Unexpected state");
-          }
-        } catch (TransactionFailureException e) {
-          Throwables.propagate(e);
-        }
-        return transaction;
-      }
-    };
-
-    FenceWait fenceWait = VisibilityFence.prepareWait(fenceId, customTxClient);
-    fenceWait.await(1000, TimeUnit.MILLISECONDS);
-    Assert.assertEquals(3, attempts.get());
-
-    try {
-      fence3.finish();
-      Assert.fail("Expected transaction to fail");
-    } catch (TransactionConflictException e) {
-      // Expected exception
-      fence3.abort();
-    }
-
-    fence3.start();
-    fence3.finish();
-  }
-
-  @Test
-  public void testFenceTimeout() throws Exception {
-    byte[] fenceId = "test_table".getBytes(Charsets.UTF_8);
-
-    final TransactionContext fence1 = new TransactionContext(new InMemoryTxSystemClient(txManager),
-                                                             VisibilityFence.create(fenceId));
-    fence1.start();
-
-    final long timeout = 100;
-    final TimeUnit timeUnit = TimeUnit.MILLISECONDS;
-    final AtomicInteger attempts = new AtomicInteger();
-    TransactionSystemClient customTxClient = new InMemoryTxSystemClient(txManager) {
-      @Override
-      public Transaction startShort() {
-        Transaction transaction = super.startShort();
-        try {
-          switch (attempts.getAndIncrement()) {
-            case 0:
-              fence1.finish();
-              break;
-          }
-          timeUnit.sleep(timeout + 1);
-        } catch (InterruptedException | TransactionFailureException e) {
-          Throwables.propagate(e);
-        }
-        return transaction;
-      }
-    };
-
-    try {
-      FenceWait fenceWait = VisibilityFence.prepareWait(fenceId, customTxClient);
-      fenceWait.await(timeout, timeUnit);
-      Assert.fail("Expected await to fail");
-    } catch (TimeoutException e) {
-      // Expected exception
-    }
-    Assert.assertEquals(1, attempts.get());
-
-    FenceWait fenceWait = VisibilityFence.prepareWait(fenceId, customTxClient);
-    fenceWait.await(timeout, timeUnit);
-    Assert.assertEquals(2, attempts.get());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java b/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java
new file mode 100644
index 0000000..28000ff
--- /dev/null
+++ b/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Scopes;
+import com.google.inject.util.Modules;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tephra.distributed.TransactionService;
+import org.apache.tephra.persist.InMemoryTransactionStateStorage;
+import org.apache.tephra.persist.TransactionStateStorage;
+import org.apache.tephra.runtime.ConfigModule;
+import org.apache.tephra.runtime.DiscoveryModules;
+import org.apache.tephra.runtime.TransactionClientModule;
+import org.apache.tephra.runtime.TransactionModules;
+import org.apache.tephra.runtime.ZKModule;
+import org.apache.twill.internal.zookeeper.InMemoryZKServer;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ThriftTransactionSystemTest extends TransactionSystemTest {
+  private static final Logger LOG = LoggerFactory.getLogger(ThriftTransactionSystemTest.class);
+  
+  private static InMemoryZKServer zkServer;
+  private static ZKClientService zkClientService;
+  private static TransactionService txService;
+  private static TransactionStateStorage storage;
+  private static TransactionSystemClient txClient;
+
+  @ClassRule
+  public static TemporaryFolder tmpFolder = new TemporaryFolder();
+  
+  @BeforeClass
+  public static void start() throws Exception {
+    zkServer = InMemoryZKServer.builder().setDataDir(tmpFolder.newFolder()).build();
+    zkServer.startAndWait();
+
+    Configuration conf = new Configuration();
+    conf.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false);
+    conf.set(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM, zkServer.getConnectionStr());
+    conf.set(TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, "n-times");
+    conf.setInt(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, 1);
+
+    Injector injector = Guice.createInjector(
+      new ConfigModule(conf),
+      new ZKModule(),
+      new DiscoveryModules().getDistributedModules(),
+      Modules.override(new TransactionModules().getDistributedModules())
+        .with(new AbstractModule() {
+          @Override
+          protected void configure() {
+            bind(TransactionStateStorage.class).to(InMemoryTransactionStateStorage.class).in(Scopes.SINGLETON);
+          }
+        }),
+      new TransactionClientModule()
+    );
+
+    zkClientService = injector.getInstance(ZKClientService.class);
+    zkClientService.startAndWait();
+
+    // start a tx server
+    txService = injector.getInstance(TransactionService.class);
+    storage = injector.getInstance(TransactionStateStorage.class);
+    txClient = injector.getInstance(TransactionSystemClient.class);
+    try {
+      LOG.info("Starting transaction service");
+      txService.startAndWait();
+    } catch (Exception e) {
+      LOG.error("Failed to start service: ", e);
+    }
+  }
+  
+  @Before
+  public void reset() throws Exception {
+    getClient().resetState();
+  }
+  
+  @AfterClass
+  public static void stop() throws Exception {
+    txService.stopAndWait();
+    storage.stopAndWait();
+    zkClientService.stopAndWait();
+    zkServer.stopAndWait();
+  }
+  
+  @Override
+  protected TransactionSystemClient getClient() throws Exception {
+    return txClient;
+  }
+
+  @Override
+  protected TransactionStateStorage getStateStorage() throws Exception {
+    return storage;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/TransactionAdminTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/TransactionAdminTest.java b/tephra-core/src/test/java/org/apache/tephra/TransactionAdminTest.java
new file mode 100644
index 0000000..9305229
--- /dev/null
+++ b/tephra-core/src/test/java/org/apache/tephra/TransactionAdminTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Scopes;
+import com.google.inject.util.Modules;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tephra.distributed.TransactionService;
+import org.apache.tephra.persist.InMemoryTransactionStateStorage;
+import org.apache.tephra.persist.TransactionStateStorage;
+import org.apache.tephra.runtime.ConfigModule;
+import org.apache.tephra.runtime.DiscoveryModules;
+import org.apache.tephra.runtime.TransactionClientModule;
+import org.apache.tephra.runtime.TransactionModules;
+import org.apache.tephra.runtime.ZKModule;
+import org.apache.twill.internal.zookeeper.InMemoryZKServer;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.util.concurrent.TimeUnit;
+
+public class TransactionAdminTest {
+  private static final Logger LOG = LoggerFactory.getLogger(TransactionAdminTest.class);
+  
+  private static Configuration conf;
+  private static InMemoryZKServer zkServer;
+  private static ZKClientService zkClientService;
+  private static TransactionService txService;
+  private static TransactionSystemClient txClient;
+
+  @ClassRule
+  public static TemporaryFolder tmpFolder = new TemporaryFolder();
+
+  @BeforeClass
+  public static void start() throws Exception {
+    zkServer = InMemoryZKServer.builder().setDataDir(tmpFolder.newFolder()).build();
+    zkServer.startAndWait();
+
+    conf = new Configuration();
+    conf.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false);
+    conf.set(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM, zkServer.getConnectionStr());
+    conf.set(TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, "n-times");
+    conf.setInt(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, 1);
+
+    Injector injector = Guice.createInjector(
+      new ConfigModule(conf),
+      new ZKModule(),
+      new DiscoveryModules().getDistributedModules(),
+      Modules.override(new TransactionModules().getDistributedModules())
+        .with(new AbstractModule() {
+          @Override
+          protected void configure() {
+            bind(TransactionStateStorage.class).to(InMemoryTransactionStateStorage.class).in(Scopes.SINGLETON);
+          }
+        }),
+      new TransactionClientModule()
+    );
+
+    zkClientService = injector.getInstance(ZKClientService.class);
+    zkClientService.startAndWait();
+
+    // start a tx server
+    txService = injector.getInstance(TransactionService.class);
+    txClient = injector.getInstance(TransactionSystemClient.class);
+    try {
+      LOG.info("Starting transaction service");
+      txService.startAndWait();
+    } catch (Exception e) {
+      LOG.error("Failed to start service: ", e);
+    }
+  }
+
+  @Before
+  public void reset() throws Exception {
+    txClient.resetState();
+  }
+
+  @AfterClass
+  public static void stop() throws Exception {
+    txService.stopAndWait();
+    zkClientService.stopAndWait();
+    zkServer.stopAndWait();
+  }
+
+  @Test
+  public void testPrintUsage() throws Exception {
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    ByteArrayOutputStream err = new ByteArrayOutputStream();
+    TransactionAdmin txAdmin = new TransactionAdmin(new PrintStream(out), new PrintStream(err));
+    int status = txAdmin.doMain(new String[0], conf);
+    Assert.assertEquals(1, status);
+    //noinspection ConstantConditions
+    Assert.assertTrue(err.toString("UTF-8").startsWith("Usage:"));
+    Assert.assertEquals(0, out.toByteArray().length);
+  }
+  
+  @Test
+  public void testTruncateInvalidTx() throws Exception {
+    Transaction tx1 = txClient.startLong();
+    Transaction tx2 = txClient.startShort();
+    txClient.invalidate(tx1.getTransactionId());
+    txClient.invalidate(tx2.getTransactionId());
+    Assert.assertEquals(2, txClient.getInvalidSize());
+
+    TransactionAdmin txAdmin = new TransactionAdmin(new PrintStream(System.out), new PrintStream(System.err));
+    int status = txAdmin.doMain(new String[]{"--truncate-invalid-tx", String.valueOf(tx2.getTransactionId())}, conf);
+    Assert.assertEquals(0, status);
+    Assert.assertEquals(1, txClient.getInvalidSize());
+  }
+
+  @Test
+  public void testTruncateInvalidTxBefore() throws Exception {
+    Transaction tx1 = txClient.startLong();
+    TimeUnit.MILLISECONDS.sleep(1);
+    long beforeTx2 = System.currentTimeMillis();
+    Transaction tx2 = txClient.startLong();
+
+    // Try before invalidation
+    Assert.assertEquals(0, txClient.getInvalidSize());
+    TransactionAdmin txAdmin = new TransactionAdmin(new PrintStream(System.out), new PrintStream(System.err));
+    int status = txAdmin.doMain(new String[]{"--truncate-invalid-tx-before", String.valueOf(beforeTx2)}, conf);
+    // Assert command failed due to in-progress transactions
+    Assert.assertEquals(1, status);
+    // Assert no change to invalid size
+    Assert.assertEquals(0, txClient.getInvalidSize());
+
+    txClient.invalidate(tx1.getTransactionId());
+    txClient.invalidate(tx2.getTransactionId());
+    Assert.assertEquals(2, txClient.getInvalidSize());
+
+    status = txAdmin.doMain(new String[]{"--truncate-invalid-tx-before", String.valueOf(beforeTx2)}, conf);
+    Assert.assertEquals(0, status);
+    Assert.assertEquals(1, txClient.getInvalidSize());
+  }
+
+  @Test
+  public void testGetInvalidTxSize() throws Exception {
+    Transaction tx1 = txClient.startShort();
+    txClient.startLong();
+    txClient.invalidate(tx1.getTransactionId());
+
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    ByteArrayOutputStream err = new ByteArrayOutputStream();
+    TransactionAdmin txAdmin = new TransactionAdmin(new PrintStream(out), new PrintStream(err));
+    int status = txAdmin.doMain(new String[]{"--get-invalid-tx-size"}, conf);
+    Assert.assertEquals(0, status);
+    //noinspection ConstantConditions
+    Assert.assertTrue(out.toString("UTF-8").contains("Invalid list size: 1\n"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java b/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java
new file mode 100644
index 0000000..20f6944
--- /dev/null
+++ b/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java
@@ -0,0 +1,676 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import com.google.inject.Singleton;
+import com.google.inject.util.Modules;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tephra.inmemory.InMemoryTxSystemClient;
+import org.apache.tephra.runtime.ConfigModule;
+import org.apache.tephra.runtime.DiscoveryModules;
+import org.apache.tephra.runtime.TransactionModules;
+import org.apache.tephra.snapshot.SnapshotCodecV4;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Tests the transaction executor.
+ */
+public class TransactionContextTest {
+  private static DummyTxClient txClient;
+
+  @ClassRule
+  public static TemporaryFolder tmpFolder = new TemporaryFolder();
+
+  @BeforeClass
+  public static void setup() throws IOException {
+    final Configuration conf = new Configuration();
+    conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, SnapshotCodecV4.class.getName());
+    conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath());
+    Injector injector = Guice.createInjector(
+      new ConfigModule(conf),
+      new DiscoveryModules().getInMemoryModules(),
+      Modules.override(
+        new TransactionModules().getInMemoryModules()).with(new AbstractModule() {
+        @Override
+        protected void configure() {
+          TransactionManager txManager = new TransactionManager(conf);
+          txManager.startAndWait();
+          bind(TransactionManager.class).toInstance(txManager);
+          bind(TransactionSystemClient.class).to(DummyTxClient.class).in(Singleton.class);
+        }
+      }));
+
+    txClient = (DummyTxClient) injector.getInstance(TransactionSystemClient.class);
+  }
+
+  final DummyTxAware ds1 = new DummyTxAware(), ds2 = new DummyTxAware();
+
+  static final byte[] A = { 'a' };
+  static final byte[] B = { 'b' };
+
+  private static TransactionContext newTransactionContext(TransactionAware... txAwares) {
+    return new TransactionContext(txClient, txAwares);
+  }
+
+  @Before
+  public void resetTxAwares() {
+    ds1.reset();
+    ds2.reset();
+  }
+
+  @Test
+  public void testSuccessful() throws TransactionFailureException, InterruptedException {
+    TransactionContext context = newTransactionContext(ds1, ds2);
+    // start transaction
+    context.start();
+    // add a change to ds1 and ds2
+    ds1.addChange(A);
+    ds2.addChange(B);
+    // commit transaction
+    context.finish();
+    // verify both are committed and post-committed
+    Assert.assertTrue(ds1.started);
+    Assert.assertTrue(ds2.started);
+    Assert.assertTrue(ds1.checked);
+    Assert.assertTrue(ds2.checked);
+    Assert.assertTrue(ds1.committed);
+    Assert.assertTrue(ds2.committed);
+    Assert.assertTrue(ds1.postCommitted);
+    Assert.assertTrue(ds2.postCommitted);
+    Assert.assertFalse(ds1.rolledBack);
+    Assert.assertFalse(ds2.rolledBack);
+    Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Committed);
+  }
+
+  @Test
+  public void testPostCommitFailure() throws TransactionFailureException, InterruptedException {
+    ds1.failPostCommitTxOnce = InduceFailure.ThrowException;
+    TransactionContext context = newTransactionContext(ds1, ds2);
+    // start transaction
+    context.start();
+    // add a change to ds1 and ds2
+    ds1.addChange(A);
+    ds2.addChange(B);
+    // commit transaction should fail but without rollback as the failure happens post-commit
+    try {
+      context.finish();
+      Assert.fail("post commit failed - exception should be thrown");
+    } catch (TransactionFailureException e) {
+      Assert.assertEquals("post failure", e.getCause().getMessage());
+    }
+    // verify both are committed and post-committed
+    Assert.assertTrue(ds1.started);
+    Assert.assertTrue(ds2.started);
+    Assert.assertTrue(ds1.checked);
+    Assert.assertTrue(ds2.checked);
+    Assert.assertTrue(ds1.committed);
+    Assert.assertTrue(ds2.committed);
+    Assert.assertTrue(ds1.postCommitted);
+    Assert.assertTrue(ds2.postCommitted);
+    Assert.assertFalse(ds1.rolledBack);
+    Assert.assertFalse(ds2.rolledBack);
+    Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Committed);
+  }
+
+  @Test
+  public void testPersistFailure() throws TransactionFailureException, InterruptedException {
+    ds1.failCommitTxOnce = InduceFailure.ThrowException;
+    TransactionContext context = newTransactionContext(ds1, ds2);
+    // start transaction
+    context.start();
+    // add a change to ds1 and ds2
+    ds1.addChange(A);
+    ds2.addChange(B);
+    // commit transaction should fail and cause rollback
+    try {
+      context.finish();
+      Assert.fail("Persist should have failed - exception should be thrown");
+    } catch (TransactionFailureException e) {
+      Assert.assertEquals("persist failure", e.getCause().getMessage());
+    }
+    // verify both are rolled back and tx is aborted
+    Assert.assertTrue(ds1.started);
+    Assert.assertTrue(ds2.started);
+    Assert.assertTrue(ds1.checked);
+    Assert.assertTrue(ds2.checked);
+    Assert.assertTrue(ds1.committed);
+    Assert.assertFalse(ds2.committed);
+    Assert.assertFalse(ds1.postCommitted);
+    Assert.assertFalse(ds2.postCommitted);
+    Assert.assertTrue(ds1.rolledBack);
+    Assert.assertTrue(ds2.rolledBack);
+    Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted);
+  }
+
+  @Test
+  public void testPersistFalse() throws TransactionFailureException, InterruptedException {
+    ds1.failCommitTxOnce = InduceFailure.ReturnFalse;
+    TransactionContext context = newTransactionContext(ds1, ds2);
+    // start transaction
+    context.start();
+    // add a change to ds1 and ds2
+    ds1.addChange(A);
+    ds2.addChange(B);
+    // commit transaction should fail and cause rollback
+    try {
+      context.finish();
+      Assert.fail("Persist should have failed - exception should be thrown");
+    } catch (TransactionFailureException e) {
+      Assert.assertNull(e.getCause()); // in this case, the ds simply returned false
+    }
+    // verify both are rolled back and tx is aborted
+    Assert.assertTrue(ds1.started);
+    Assert.assertTrue(ds2.started);
+    Assert.assertTrue(ds1.checked);
+    Assert.assertTrue(ds2.checked);
+    Assert.assertTrue(ds1.committed);
+    Assert.assertFalse(ds2.committed);
+    Assert.assertFalse(ds1.postCommitted);
+    Assert.assertFalse(ds2.postCommitted);
+    Assert.assertTrue(ds1.rolledBack);
+    Assert.assertTrue(ds2.rolledBack);
+    Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted);
+  }
+
+  @Test
+  public void testPersistAndRollbackFailure() throws TransactionFailureException, InterruptedException {
+    ds1.failCommitTxOnce = InduceFailure.ThrowException;
+    ds1.failRollbackTxOnce = InduceFailure.ThrowException;
+    TransactionContext context = newTransactionContext(ds1, ds2);
+    // start transaction
+    context.start();
+    // add a change to ds1 and ds2
+    ds1.addChange(A);
+    ds2.addChange(B);
+    // commit transaction should fail and cause rollback
+    try {
+      context.finish();
+      Assert.fail("Persist should have failed - exception should be thrown");
+    } catch (TransactionFailureException e) {
+      Assert.assertEquals("persist failure", e.getCause().getMessage());
+    }
+    // verify both are rolled back and tx is invalidated
+    Assert.assertTrue(ds1.started);
+    Assert.assertTrue(ds2.started);
+    Assert.assertTrue(ds1.checked);
+    Assert.assertTrue(ds2.checked);
+    Assert.assertTrue(ds1.committed);
+    Assert.assertFalse(ds2.committed);
+    Assert.assertFalse(ds1.postCommitted);
+    Assert.assertFalse(ds2.postCommitted);
+    Assert.assertTrue(ds1.rolledBack);
+    Assert.assertTrue(ds2.rolledBack);
+    Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Invalidated);
+  }
+
+  @Test
+  public void testPersistAndRollbackFalse() throws TransactionFailureException, InterruptedException {
+    ds1.failCommitTxOnce = InduceFailure.ReturnFalse;
+    ds1.failRollbackTxOnce = InduceFailure.ReturnFalse;
+    TransactionContext context = newTransactionContext(ds1, ds2);
+    // start transaction
+    context.start();
+    // add a change to ds1 and ds2
+    ds1.addChange(A);
+    ds2.addChange(B);
+    // commit transaction should fail and cause rollback
+    try {
+      context.finish();
+      Assert.fail("Persist should have failed - exception should be thrown");
+    } catch (TransactionFailureException e) {
+      Assert.assertNull(e.getCause()); // in this case, the ds simply returned false
+    }
+    // verify both are rolled back and tx is invalidated
+    Assert.assertTrue(ds1.started);
+    Assert.assertTrue(ds2.started);
+    Assert.assertTrue(ds1.checked);
+    Assert.assertTrue(ds2.checked);
+    Assert.assertTrue(ds1.committed);
+    Assert.assertFalse(ds2.committed);
+    Assert.assertFalse(ds1.postCommitted);
+    Assert.assertFalse(ds2.postCommitted);
+    Assert.assertTrue(ds1.rolledBack);
+    Assert.assertTrue(ds2.rolledBack);
+    Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Invalidated);
+  }
+
+  @Test
+  public void testCommitFalse() throws TransactionFailureException, InterruptedException {
+    txClient.failCommits = 1;
+    TransactionContext context = newTransactionContext(ds1, ds2);
+    // start transaction
+    context.start();
+    // add a change to ds1 and ds2
+    ds1.addChange(A);
+    ds2.addChange(B);
+    // commit transaction should fail and cause rollback
+    try {
+      context.finish();
+      Assert.fail("commit failed - exception should be thrown");
+    } catch (TransactionConflictException e) {
+      Assert.assertNull(e.getCause());
+    }
+    // verify both are rolled back and tx is aborted
+    Assert.assertTrue(ds1.started);
+    Assert.assertTrue(ds2.started);
+    Assert.assertTrue(ds1.checked);
+    Assert.assertTrue(ds2.checked);
+    Assert.assertTrue(ds1.committed);
+    Assert.assertTrue(ds2.committed);
+    Assert.assertFalse(ds1.postCommitted);
+    Assert.assertFalse(ds2.postCommitted);
+    Assert.assertTrue(ds1.rolledBack);
+    Assert.assertTrue(ds2.rolledBack);
+    Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted);
+  }
+
+  @Test
+  public void testCanCommitFalse() throws TransactionFailureException, InterruptedException {
+    txClient.failCanCommitOnce = true;
+    TransactionContext context = newTransactionContext(ds1, ds2);
+    // start transaction
+    context.start();
+    // add a change to ds1 and ds2
+    ds1.addChange(A);
+    ds2.addChange(B);
+    // commit transaction should fail and cause rollback
+    try {
+      context.finish();
+      Assert.fail("commit failed - exception should be thrown");
+    } catch (TransactionConflictException e) {
+      Assert.assertNull(e.getCause());
+    }
+    // verify both are rolled back and tx is aborted
+    Assert.assertTrue(ds1.started);
+    Assert.assertTrue(ds2.started);
+    Assert.assertTrue(ds1.checked);
+    Assert.assertTrue(ds2.checked);
+    Assert.assertFalse(ds1.committed);
+    Assert.assertFalse(ds2.committed);
+    Assert.assertFalse(ds1.postCommitted);
+    Assert.assertFalse(ds2.postCommitted);
+    Assert.assertTrue(ds1.rolledBack);
+    Assert.assertTrue(ds2.rolledBack);
+    Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted);
+  }
+
+  @Test
+  public void testChangesAndRollbackFailure() throws TransactionFailureException, InterruptedException {
+    ds1.failChangesTxOnce = InduceFailure.ThrowException;
+    ds1.failRollbackTxOnce = InduceFailure.ThrowException;
+    TransactionContext context = newTransactionContext(ds1, ds2);
+    // start transaction
+    context.start();
+    // add a change to ds1 and ds2
+    ds1.addChange(A);
+    ds2.addChange(B);
+    // commit transaction should fail and cause rollback
+    try {
+      context.finish();
+      Assert.fail("get changes failed - exception should be thrown");
+    } catch (TransactionFailureException e) {
+      Assert.assertEquals("changes failure", e.getCause().getMessage());
+    }
+    // verify both are rolled back and tx is invalidated
+    Assert.assertTrue(ds1.started);
+    Assert.assertTrue(ds2.started);
+    Assert.assertTrue(ds1.checked);
+    Assert.assertFalse(ds2.checked);
+    Assert.assertFalse(ds1.committed);
+    Assert.assertFalse(ds2.committed);
+    Assert.assertFalse(ds1.postCommitted);
+    Assert.assertFalse(ds2.postCommitted);
+    Assert.assertTrue(ds1.rolledBack);
+    Assert.assertTrue(ds2.rolledBack);
+    Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Invalidated);
+  }
+
+  @Test
+  public void testStartAndRollbackFailure() throws TransactionFailureException, InterruptedException {
+    ds1.failStartTxOnce = InduceFailure.ThrowException;
+    TransactionContext context = newTransactionContext(ds1, ds2);
+    // start transaction
+    try {
+      context.start();
+      Assert.fail("start failed - exception should be thrown");
+    } catch (TransactionFailureException e) {
+      Assert.assertEquals("start failure", e.getCause().getMessage());
+    }
+    // verify both are not rolled back and tx is aborted
+    Assert.assertTrue(ds1.started);
+    Assert.assertFalse(ds2.started);
+    Assert.assertFalse(ds1.checked);
+    Assert.assertFalse(ds2.checked);
+    Assert.assertFalse(ds1.committed);
+    Assert.assertFalse(ds2.committed);
+    Assert.assertFalse(ds1.postCommitted);
+    Assert.assertFalse(ds2.postCommitted);
+    Assert.assertFalse(ds1.rolledBack);
+    Assert.assertFalse(ds2.rolledBack);
+    Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted);
+  }
+
+  @Test
+  public void testAddThenSuccess() throws TransactionFailureException, InterruptedException {
+    TransactionContext context = newTransactionContext(ds1);
+    // start transaction
+    context.start();
+    // add a change to ds1
+    ds1.addChange(A);
+    // add ds2 to the tx
+    context.addTransactionAware(ds2);
+    // add a change to ds2
+    ds2.addChange(B);
+    // commit transaction
+    context.finish();
+    // verify both are committed and post-committed
+    Assert.assertTrue(ds1.started);
+    Assert.assertTrue(ds2.started);
+    Assert.assertTrue(ds1.checked);
+    Assert.assertTrue(ds2.checked);
+    Assert.assertTrue(ds1.committed);
+    Assert.assertTrue(ds2.committed);
+    Assert.assertTrue(ds1.postCommitted);
+    Assert.assertTrue(ds2.postCommitted);
+    Assert.assertFalse(ds1.rolledBack);
+    Assert.assertFalse(ds2.rolledBack);
+    Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Committed);
+  }
+
+  @Test
+  public void testAddThenFailure() throws TransactionFailureException, InterruptedException {
+    ds2.failCommitTxOnce = InduceFailure.ThrowException;
+
+    TransactionContext context = newTransactionContext(ds1);
+    // start transaction
+    context.start();
+    // add a change to ds1
+    ds1.addChange(A);
+    // add ds2 to the tx
+    context.addTransactionAware(ds2);
+    // add a change to ds2
+    ds2.addChange(B);
+    // commit transaction should fail and cause rollback
+    try {
+      context.finish();
+      Assert.fail("Persist should have failed - exception should be thrown");
+    } catch (TransactionFailureException e) {
+      Assert.assertEquals("persist failure", e.getCause().getMessage());
+    }
+    // verify both are rolled back and tx is aborted
+    Assert.assertTrue(ds1.started);
+    Assert.assertTrue(ds2.started);
+    Assert.assertTrue(ds1.checked);
+    Assert.assertTrue(ds2.checked);
+    Assert.assertTrue(ds1.committed);
+    Assert.assertTrue(ds2.committed);
+    Assert.assertFalse(ds1.postCommitted);
+    Assert.assertFalse(ds2.postCommitted);
+    Assert.assertTrue(ds1.rolledBack);
+    Assert.assertTrue(ds2.rolledBack);
+    Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted);
+  }
+
+  @Test
+  public void testAddThenRemoveSuccess() throws TransactionFailureException {
+    TransactionContext context = newTransactionContext();
+
+    context.start();
+    Assert.assertTrue(context.addTransactionAware(ds1));
+    ds1.addChange(A);
+
+    try {
+      context.removeTransactionAware(ds1);
+      Assert.fail("Removal of TransactionAware should fails when there is active transaction.");
+    } catch (IllegalStateException e) {
+      // Expected
+    }
+
+    context.finish();
+
+    Assert.assertTrue(context.removeTransactionAware(ds1));
+    // Removing a TransactionAware not added before should returns false
+    Assert.assertFalse(context.removeTransactionAware(ds2));
+
+    // Verify ds1 is committed and post-committed
+    Assert.assertTrue(ds1.started);
+    Assert.assertTrue(ds1.checked);
+    Assert.assertTrue(ds1.committed);
+    Assert.assertTrue(ds1.postCommitted);
+    Assert.assertFalse(ds1.rolledBack);
+
+    // Verify nothing happen to ds2
+    Assert.assertFalse(ds2.started);
+    Assert.assertFalse(ds2.checked);
+    Assert.assertFalse(ds2.committed);
+    Assert.assertFalse(ds2.postCommitted);
+    Assert.assertFalse(ds2.rolledBack);
+
+    Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Committed);
+  }
+
+  @Test
+  public void testAndThenRemoveOnFailure() throws TransactionFailureException {
+    ds1.failCommitTxOnce = InduceFailure.ThrowException;
+    TransactionContext context = newTransactionContext();
+
+    context.start();
+    Assert.assertTrue(context.addTransactionAware(ds1));
+    ds1.addChange(A);
+
+    try {
+      context.finish();
+      Assert.fail("Persist should have failed - exception should be thrown");
+    } catch (TransactionFailureException e) {
+      Assert.assertEquals("persist failure", e.getCause().getMessage());
+    }
+
+    Assert.assertTrue(context.removeTransactionAware(ds1));
+
+    // Verify ds1 is rolled back
+    Assert.assertTrue(ds1.started);
+    Assert.assertTrue(ds1.checked);
+    Assert.assertTrue(ds1.committed);
+    Assert.assertFalse(ds1.postCommitted);
+    Assert.assertTrue(ds1.rolledBack);
+
+    Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted);
+  }
+
+  enum InduceFailure { NoFailure, ReturnFalse, ThrowException }
+
+  static class DummyTxAware implements TransactionAware {
+
+    Transaction tx;
+    boolean started = false;
+    boolean committed = false;
+    boolean checked = false;
+    boolean rolledBack = false;
+    boolean postCommitted = false;
+    List<byte[]> changes = Lists.newArrayList();
+
+    InduceFailure failStartTxOnce = InduceFailure.NoFailure;
+    InduceFailure failChangesTxOnce = InduceFailure.NoFailure;
+    InduceFailure failCommitTxOnce = InduceFailure.NoFailure;
+    InduceFailure failPostCommitTxOnce = InduceFailure.NoFailure;
+    InduceFailure failRollbackTxOnce = InduceFailure.NoFailure;
+
+    void addChange(byte[] key) {
+      changes.add(key);
+    }
+
+    void reset() {
+      tx = null;
+      started = false;
+      checked = false;
+      committed = false;
+      rolledBack = false;
+      postCommitted = false;
+      changes.clear();
+    }
+
+    @Override
+    public void startTx(Transaction tx) {
+      reset();
+      started = true;
+      this.tx = tx;
+      if (failStartTxOnce == InduceFailure.ThrowException) {
+        failStartTxOnce = InduceFailure.NoFailure;
+        throw new RuntimeException("start failure");
+      }
+    }
+
+    @Override
+    public void updateTx(Transaction tx) {
+      this.tx = tx;
+    }
+
+    @Override
+    public Collection<byte[]> getTxChanges() {
+      checked = true;
+      if (failChangesTxOnce == InduceFailure.ThrowException) {
+        failChangesTxOnce = InduceFailure.NoFailure;
+        throw new RuntimeException("changes failure");
+      }
+      return ImmutableList.copyOf(changes);
+    }
+
+    @Override
+    public boolean commitTx() throws Exception {
+      committed = true;
+      if (failCommitTxOnce == InduceFailure.ThrowException) {
+        failCommitTxOnce = InduceFailure.NoFailure;
+        throw new RuntimeException("persist failure");
+      }
+      if (failCommitTxOnce == InduceFailure.ReturnFalse) {
+        failCommitTxOnce = InduceFailure.NoFailure;
+        return false;
+      }
+      return true;
+    }
+
+    @Override
+    public void postTxCommit() {
+      postCommitted = true;
+      if (failPostCommitTxOnce == InduceFailure.ThrowException) {
+        failPostCommitTxOnce = InduceFailure.NoFailure;
+        throw new RuntimeException("post failure");
+      }
+    }
+
+    @Override
+    public boolean rollbackTx() throws Exception {
+      rolledBack = true;
+      if (failRollbackTxOnce == InduceFailure.ThrowException) {
+        failRollbackTxOnce = InduceFailure.NoFailure;
+        throw new RuntimeException("rollback failure");
+      }
+      if (failRollbackTxOnce == InduceFailure.ReturnFalse) {
+        failRollbackTxOnce = InduceFailure.NoFailure;
+        return false;
+      }
+      return true;
+    }
+
+    @Override
+    public String getTransactionAwareName() {
+      return "dummy";
+    }
+  }
+
+  static class DummyTxClient extends InMemoryTxSystemClient {
+
+    boolean failCanCommitOnce = false;
+    int failCommits = 0;
+    enum CommitState {
+      Started, Committed, Aborted, Invalidated
+    }
+    CommitState state = CommitState.Started;
+
+    @Inject
+    DummyTxClient(TransactionManager txmgr) {
+      super(txmgr);
+    }
+
+    @Override
+    public boolean canCommit(Transaction tx, Collection<byte[]> changeIds) throws TransactionNotInProgressException {
+      if (failCanCommitOnce) {
+        failCanCommitOnce = false;
+        return false;
+      } else {
+        return super.canCommit(tx, changeIds);
+      }
+    }
+
+    @Override
+    public boolean commit(Transaction tx) throws TransactionNotInProgressException {
+      if (failCommits-- > 0) {
+        return false;
+      } else {
+        state = CommitState.Committed;
+        return super.commit(tx);
+      }
+    }
+
+    @Override
+    public Transaction startLong() {
+      state = CommitState.Started;
+      return super.startLong();
+    }
+
+    @Override
+    public Transaction startShort() {
+      state = CommitState.Started;
+      return super.startShort();
+    }
+
+    @Override
+    public Transaction startShort(int timeout) {
+      state = CommitState.Started;
+      return super.startShort(timeout);
+    }
+
+    @Override
+    public void abort(Transaction tx) {
+      state = CommitState.Aborted;
+      super.abort(tx);
+    }
+
+    @Override
+    public boolean invalidate(long tx) {
+      state = CommitState.Invalidated;
+      return super.invalidate(tx);
+    }
+  }
+}