You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by po...@apache.org on 2016/05/11 20:15:30 UTC

[30/56] [abbrv] [partial] incubator-tephra git commit: Rename package to org.apache.tephra

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/TransactionContextTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/co/cask/tephra/TransactionContextTest.java b/tephra-core/src/test/java/co/cask/tephra/TransactionContextTest.java
deleted file mode 100644
index 419bffb..0000000
--- a/tephra-core/src/test/java/co/cask/tephra/TransactionContextTest.java
+++ /dev/null
@@ -1,676 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra;
-
-import co.cask.tephra.inmemory.InMemoryTxSystemClient;
-import co.cask.tephra.runtime.ConfigModule;
-import co.cask.tephra.runtime.DiscoveryModules;
-import co.cask.tephra.runtime.TransactionModules;
-import co.cask.tephra.snapshot.SnapshotCodecV4;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.inject.AbstractModule;
-import com.google.inject.Guice;
-import com.google.inject.Inject;
-import com.google.inject.Injector;
-import com.google.inject.Singleton;
-import com.google.inject.util.Modules;
-import org.apache.hadoop.conf.Configuration;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-
-/**
- * Tests the transaction executor.
- */
-public class TransactionContextTest {
-  private static DummyTxClient txClient;
-
-  @ClassRule
-  public static TemporaryFolder tmpFolder = new TemporaryFolder();
-
-  @BeforeClass
-  public static void setup() throws IOException {
-    final Configuration conf = new Configuration();
-    conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, SnapshotCodecV4.class.getName());
-    conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath());
-    Injector injector = Guice.createInjector(
-      new ConfigModule(conf),
-      new DiscoveryModules().getInMemoryModules(),
-      Modules.override(
-        new TransactionModules().getInMemoryModules()).with(new AbstractModule() {
-        @Override
-        protected void configure() {
-          TransactionManager txManager = new TransactionManager(conf);
-          txManager.startAndWait();
-          bind(TransactionManager.class).toInstance(txManager);
-          bind(TransactionSystemClient.class).to(DummyTxClient.class).in(Singleton.class);
-        }
-      }));
-
-    txClient = (DummyTxClient) injector.getInstance(TransactionSystemClient.class);
-  }
-
-  final DummyTxAware ds1 = new DummyTxAware(), ds2 = new DummyTxAware();
-
-  static final byte[] A = { 'a' };
-  static final byte[] B = { 'b' };
-
-  private static TransactionContext newTransactionContext(TransactionAware... txAwares) {
-    return new TransactionContext(txClient, txAwares);
-  }
-
-  @Before
-  public void resetTxAwares() {
-    ds1.reset();
-    ds2.reset();
-  }
-
-  @Test
-  public void testSuccessful() throws TransactionFailureException, InterruptedException {
-    TransactionContext context = newTransactionContext(ds1, ds2);
-    // start transaction
-    context.start();
-    // add a change to ds1 and ds2
-    ds1.addChange(A);
-    ds2.addChange(B);
-    // commit transaction
-    context.finish();
-    // verify both are committed and post-committed
-    Assert.assertTrue(ds1.started);
-    Assert.assertTrue(ds2.started);
-    Assert.assertTrue(ds1.checked);
-    Assert.assertTrue(ds2.checked);
-    Assert.assertTrue(ds1.committed);
-    Assert.assertTrue(ds2.committed);
-    Assert.assertTrue(ds1.postCommitted);
-    Assert.assertTrue(ds2.postCommitted);
-    Assert.assertFalse(ds1.rolledBack);
-    Assert.assertFalse(ds2.rolledBack);
-    Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Committed);
-  }
-
-  @Test
-  public void testPostCommitFailure() throws TransactionFailureException, InterruptedException {
-    ds1.failPostCommitTxOnce = InduceFailure.ThrowException;
-    TransactionContext context = newTransactionContext(ds1, ds2);
-    // start transaction
-    context.start();
-    // add a change to ds1 and ds2
-    ds1.addChange(A);
-    ds2.addChange(B);
-    // commit transaction should fail but without rollback as the failure happens post-commit
-    try {
-      context.finish();
-      Assert.fail("post commit failed - exception should be thrown");
-    } catch (TransactionFailureException e) {
-      Assert.assertEquals("post failure", e.getCause().getMessage());
-    }
-    // verify both are committed and post-committed
-    Assert.assertTrue(ds1.started);
-    Assert.assertTrue(ds2.started);
-    Assert.assertTrue(ds1.checked);
-    Assert.assertTrue(ds2.checked);
-    Assert.assertTrue(ds1.committed);
-    Assert.assertTrue(ds2.committed);
-    Assert.assertTrue(ds1.postCommitted);
-    Assert.assertTrue(ds2.postCommitted);
-    Assert.assertFalse(ds1.rolledBack);
-    Assert.assertFalse(ds2.rolledBack);
-    Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Committed);
-  }
-
-  @Test
-  public void testPersistFailure() throws TransactionFailureException, InterruptedException {
-    ds1.failCommitTxOnce = InduceFailure.ThrowException;
-    TransactionContext context = newTransactionContext(ds1, ds2);
-    // start transaction
-    context.start();
-    // add a change to ds1 and ds2
-    ds1.addChange(A);
-    ds2.addChange(B);
-    // commit transaction should fail and cause rollback
-    try {
-      context.finish();
-      Assert.fail("Persist should have failed - exception should be thrown");
-    } catch (TransactionFailureException e) {
-      Assert.assertEquals("persist failure", e.getCause().getMessage());
-    }
-    // verify both are rolled back and tx is aborted
-    Assert.assertTrue(ds1.started);
-    Assert.assertTrue(ds2.started);
-    Assert.assertTrue(ds1.checked);
-    Assert.assertTrue(ds2.checked);
-    Assert.assertTrue(ds1.committed);
-    Assert.assertFalse(ds2.committed);
-    Assert.assertFalse(ds1.postCommitted);
-    Assert.assertFalse(ds2.postCommitted);
-    Assert.assertTrue(ds1.rolledBack);
-    Assert.assertTrue(ds2.rolledBack);
-    Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted);
-  }
-
-  @Test
-  public void testPersistFalse() throws TransactionFailureException, InterruptedException {
-    ds1.failCommitTxOnce = InduceFailure.ReturnFalse;
-    TransactionContext context = newTransactionContext(ds1, ds2);
-    // start transaction
-    context.start();
-    // add a change to ds1 and ds2
-    ds1.addChange(A);
-    ds2.addChange(B);
-    // commit transaction should fail and cause rollback
-    try {
-      context.finish();
-      Assert.fail("Persist should have failed - exception should be thrown");
-    } catch (TransactionFailureException e) {
-      Assert.assertNull(e.getCause()); // in this case, the ds simply returned false
-    }
-    // verify both are rolled back and tx is aborted
-    Assert.assertTrue(ds1.started);
-    Assert.assertTrue(ds2.started);
-    Assert.assertTrue(ds1.checked);
-    Assert.assertTrue(ds2.checked);
-    Assert.assertTrue(ds1.committed);
-    Assert.assertFalse(ds2.committed);
-    Assert.assertFalse(ds1.postCommitted);
-    Assert.assertFalse(ds2.postCommitted);
-    Assert.assertTrue(ds1.rolledBack);
-    Assert.assertTrue(ds2.rolledBack);
-    Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted);
-  }
-
-  @Test
-  public void testPersistAndRollbackFailure() throws TransactionFailureException, InterruptedException {
-    ds1.failCommitTxOnce = InduceFailure.ThrowException;
-    ds1.failRollbackTxOnce = InduceFailure.ThrowException;
-    TransactionContext context = newTransactionContext(ds1, ds2);
-    // start transaction
-    context.start();
-    // add a change to ds1 and ds2
-    ds1.addChange(A);
-    ds2.addChange(B);
-    // commit transaction should fail and cause rollback
-    try {
-      context.finish();
-      Assert.fail("Persist should have failed - exception should be thrown");
-    } catch (TransactionFailureException e) {
-      Assert.assertEquals("persist failure", e.getCause().getMessage());
-    }
-    // verify both are rolled back and tx is invalidated
-    Assert.assertTrue(ds1.started);
-    Assert.assertTrue(ds2.started);
-    Assert.assertTrue(ds1.checked);
-    Assert.assertTrue(ds2.checked);
-    Assert.assertTrue(ds1.committed);
-    Assert.assertFalse(ds2.committed);
-    Assert.assertFalse(ds1.postCommitted);
-    Assert.assertFalse(ds2.postCommitted);
-    Assert.assertTrue(ds1.rolledBack);
-    Assert.assertTrue(ds2.rolledBack);
-    Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Invalidated);
-  }
-
-  @Test
-  public void testPersistAndRollbackFalse() throws TransactionFailureException, InterruptedException {
-    ds1.failCommitTxOnce = InduceFailure.ReturnFalse;
-    ds1.failRollbackTxOnce = InduceFailure.ReturnFalse;
-    TransactionContext context = newTransactionContext(ds1, ds2);
-    // start transaction
-    context.start();
-    // add a change to ds1 and ds2
-    ds1.addChange(A);
-    ds2.addChange(B);
-    // commit transaction should fail and cause rollback
-    try {
-      context.finish();
-      Assert.fail("Persist should have failed - exception should be thrown");
-    } catch (TransactionFailureException e) {
-      Assert.assertNull(e.getCause()); // in this case, the ds simply returned false
-    }
-    // verify both are rolled back and tx is invalidated
-    Assert.assertTrue(ds1.started);
-    Assert.assertTrue(ds2.started);
-    Assert.assertTrue(ds1.checked);
-    Assert.assertTrue(ds2.checked);
-    Assert.assertTrue(ds1.committed);
-    Assert.assertFalse(ds2.committed);
-    Assert.assertFalse(ds1.postCommitted);
-    Assert.assertFalse(ds2.postCommitted);
-    Assert.assertTrue(ds1.rolledBack);
-    Assert.assertTrue(ds2.rolledBack);
-    Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Invalidated);
-  }
-
-  @Test
-  public void testCommitFalse() throws TransactionFailureException, InterruptedException {
-    txClient.failCommits = 1;
-    TransactionContext context = newTransactionContext(ds1, ds2);
-    // start transaction
-    context.start();
-    // add a change to ds1 and ds2
-    ds1.addChange(A);
-    ds2.addChange(B);
-    // commit transaction should fail and cause rollback
-    try {
-      context.finish();
-      Assert.fail("commit failed - exception should be thrown");
-    } catch (TransactionConflictException e) {
-      Assert.assertNull(e.getCause());
-    }
-    // verify both are rolled back and tx is aborted
-    Assert.assertTrue(ds1.started);
-    Assert.assertTrue(ds2.started);
-    Assert.assertTrue(ds1.checked);
-    Assert.assertTrue(ds2.checked);
-    Assert.assertTrue(ds1.committed);
-    Assert.assertTrue(ds2.committed);
-    Assert.assertFalse(ds1.postCommitted);
-    Assert.assertFalse(ds2.postCommitted);
-    Assert.assertTrue(ds1.rolledBack);
-    Assert.assertTrue(ds2.rolledBack);
-    Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted);
-  }
-
-  @Test
-  public void testCanCommitFalse() throws TransactionFailureException, InterruptedException {
-    txClient.failCanCommitOnce = true;
-    TransactionContext context = newTransactionContext(ds1, ds2);
-    // start transaction
-    context.start();
-    // add a change to ds1 and ds2
-    ds1.addChange(A);
-    ds2.addChange(B);
-    // commit transaction should fail and cause rollback
-    try {
-      context.finish();
-      Assert.fail("commit failed - exception should be thrown");
-    } catch (TransactionConflictException e) {
-      Assert.assertNull(e.getCause());
-    }
-    // verify both are rolled back and tx is aborted
-    Assert.assertTrue(ds1.started);
-    Assert.assertTrue(ds2.started);
-    Assert.assertTrue(ds1.checked);
-    Assert.assertTrue(ds2.checked);
-    Assert.assertFalse(ds1.committed);
-    Assert.assertFalse(ds2.committed);
-    Assert.assertFalse(ds1.postCommitted);
-    Assert.assertFalse(ds2.postCommitted);
-    Assert.assertTrue(ds1.rolledBack);
-    Assert.assertTrue(ds2.rolledBack);
-    Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted);
-  }
-
-  @Test
-  public void testChangesAndRollbackFailure() throws TransactionFailureException, InterruptedException {
-    ds1.failChangesTxOnce = InduceFailure.ThrowException;
-    ds1.failRollbackTxOnce = InduceFailure.ThrowException;
-    TransactionContext context = newTransactionContext(ds1, ds2);
-    // start transaction
-    context.start();
-    // add a change to ds1 and ds2
-    ds1.addChange(A);
-    ds2.addChange(B);
-    // commit transaction should fail and cause rollback
-    try {
-      context.finish();
-      Assert.fail("get changes failed - exception should be thrown");
-    } catch (TransactionFailureException e) {
-      Assert.assertEquals("changes failure", e.getCause().getMessage());
-    }
-    // verify both are rolled back and tx is invalidated
-    Assert.assertTrue(ds1.started);
-    Assert.assertTrue(ds2.started);
-    Assert.assertTrue(ds1.checked);
-    Assert.assertFalse(ds2.checked);
-    Assert.assertFalse(ds1.committed);
-    Assert.assertFalse(ds2.committed);
-    Assert.assertFalse(ds1.postCommitted);
-    Assert.assertFalse(ds2.postCommitted);
-    Assert.assertTrue(ds1.rolledBack);
-    Assert.assertTrue(ds2.rolledBack);
-    Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Invalidated);
-  }
-
-  @Test
-  public void testStartAndRollbackFailure() throws TransactionFailureException, InterruptedException {
-    ds1.failStartTxOnce = InduceFailure.ThrowException;
-    TransactionContext context = newTransactionContext(ds1, ds2);
-    // start transaction
-    try {
-      context.start();
-      Assert.fail("start failed - exception should be thrown");
-    } catch (TransactionFailureException e) {
-      Assert.assertEquals("start failure", e.getCause().getMessage());
-    }
-    // verify both are not rolled back and tx is aborted
-    Assert.assertTrue(ds1.started);
-    Assert.assertFalse(ds2.started);
-    Assert.assertFalse(ds1.checked);
-    Assert.assertFalse(ds2.checked);
-    Assert.assertFalse(ds1.committed);
-    Assert.assertFalse(ds2.committed);
-    Assert.assertFalse(ds1.postCommitted);
-    Assert.assertFalse(ds2.postCommitted);
-    Assert.assertFalse(ds1.rolledBack);
-    Assert.assertFalse(ds2.rolledBack);
-    Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted);
-  }
-
-  @Test
-  public void testAddThenSuccess() throws TransactionFailureException, InterruptedException {
-    TransactionContext context = newTransactionContext(ds1);
-    // start transaction
-    context.start();
-    // add a change to ds1
-    ds1.addChange(A);
-    // add ds2 to the tx
-    context.addTransactionAware(ds2);
-    // add a change to ds2
-    ds2.addChange(B);
-    // commit transaction
-    context.finish();
-    // verify both are committed and post-committed
-    Assert.assertTrue(ds1.started);
-    Assert.assertTrue(ds2.started);
-    Assert.assertTrue(ds1.checked);
-    Assert.assertTrue(ds2.checked);
-    Assert.assertTrue(ds1.committed);
-    Assert.assertTrue(ds2.committed);
-    Assert.assertTrue(ds1.postCommitted);
-    Assert.assertTrue(ds2.postCommitted);
-    Assert.assertFalse(ds1.rolledBack);
-    Assert.assertFalse(ds2.rolledBack);
-    Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Committed);
-  }
-
-  @Test
-  public void testAddThenFailure() throws TransactionFailureException, InterruptedException {
-    ds2.failCommitTxOnce = InduceFailure.ThrowException;
-
-    TransactionContext context = newTransactionContext(ds1);
-    // start transaction
-    context.start();
-    // add a change to ds1
-    ds1.addChange(A);
-    // add ds2 to the tx
-    context.addTransactionAware(ds2);
-    // add a change to ds2
-    ds2.addChange(B);
-    // commit transaction should fail and cause rollback
-    try {
-      context.finish();
-      Assert.fail("Persist should have failed - exception should be thrown");
-    } catch (TransactionFailureException e) {
-      Assert.assertEquals("persist failure", e.getCause().getMessage());
-    }
-    // verify both are rolled back and tx is aborted
-    Assert.assertTrue(ds1.started);
-    Assert.assertTrue(ds2.started);
-    Assert.assertTrue(ds1.checked);
-    Assert.assertTrue(ds2.checked);
-    Assert.assertTrue(ds1.committed);
-    Assert.assertTrue(ds2.committed);
-    Assert.assertFalse(ds1.postCommitted);
-    Assert.assertFalse(ds2.postCommitted);
-    Assert.assertTrue(ds1.rolledBack);
-    Assert.assertTrue(ds2.rolledBack);
-    Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted);
-  }
-
-  @Test
-  public void testAddThenRemoveSuccess() throws TransactionFailureException {
-    TransactionContext context = newTransactionContext();
-
-    context.start();
-    Assert.assertTrue(context.addTransactionAware(ds1));
-    ds1.addChange(A);
-
-    try {
-      context.removeTransactionAware(ds1);
-      Assert.fail("Removal of TransactionAware should fails when there is active transaction.");
-    } catch (IllegalStateException e) {
-      // Expected
-    }
-
-    context.finish();
-
-    Assert.assertTrue(context.removeTransactionAware(ds1));
-    // Removing a TransactionAware not added before should returns false
-    Assert.assertFalse(context.removeTransactionAware(ds2));
-
-    // Verify ds1 is committed and post-committed
-    Assert.assertTrue(ds1.started);
-    Assert.assertTrue(ds1.checked);
-    Assert.assertTrue(ds1.committed);
-    Assert.assertTrue(ds1.postCommitted);
-    Assert.assertFalse(ds1.rolledBack);
-
-    // Verify nothing happen to ds2
-    Assert.assertFalse(ds2.started);
-    Assert.assertFalse(ds2.checked);
-    Assert.assertFalse(ds2.committed);
-    Assert.assertFalse(ds2.postCommitted);
-    Assert.assertFalse(ds2.rolledBack);
-
-    Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Committed);
-  }
-
-  @Test
-  public void testAndThenRemoveOnFailure() throws TransactionFailureException {
-    ds1.failCommitTxOnce = InduceFailure.ThrowException;
-    TransactionContext context = newTransactionContext();
-
-    context.start();
-    Assert.assertTrue(context.addTransactionAware(ds1));
-    ds1.addChange(A);
-
-    try {
-      context.finish();
-      Assert.fail("Persist should have failed - exception should be thrown");
-    } catch (TransactionFailureException e) {
-      Assert.assertEquals("persist failure", e.getCause().getMessage());
-    }
-
-    Assert.assertTrue(context.removeTransactionAware(ds1));
-
-    // Verify ds1 is rolled back
-    Assert.assertTrue(ds1.started);
-    Assert.assertTrue(ds1.checked);
-    Assert.assertTrue(ds1.committed);
-    Assert.assertFalse(ds1.postCommitted);
-    Assert.assertTrue(ds1.rolledBack);
-
-    Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted);
-  }
-
-  enum InduceFailure { NoFailure, ReturnFalse, ThrowException }
-
-  static class DummyTxAware implements TransactionAware {
-
-    Transaction tx;
-    boolean started = false;
-    boolean committed = false;
-    boolean checked = false;
-    boolean rolledBack = false;
-    boolean postCommitted = false;
-    List<byte[]> changes = Lists.newArrayList();
-
-    InduceFailure failStartTxOnce = InduceFailure.NoFailure;
-    InduceFailure failChangesTxOnce = InduceFailure.NoFailure;
-    InduceFailure failCommitTxOnce = InduceFailure.NoFailure;
-    InduceFailure failPostCommitTxOnce = InduceFailure.NoFailure;
-    InduceFailure failRollbackTxOnce = InduceFailure.NoFailure;
-
-    void addChange(byte[] key) {
-      changes.add(key);
-    }
-
-    void reset() {
-      tx = null;
-      started = false;
-      checked = false;
-      committed = false;
-      rolledBack = false;
-      postCommitted = false;
-      changes.clear();
-    }
-
-    @Override
-    public void startTx(Transaction tx) {
-      reset();
-      started = true;
-      this.tx = tx;
-      if (failStartTxOnce == InduceFailure.ThrowException) {
-        failStartTxOnce = InduceFailure.NoFailure;
-        throw new RuntimeException("start failure");
-      }
-    }
-
-    @Override
-    public void updateTx(Transaction tx) {
-      this.tx = tx;
-    }
-
-    @Override
-    public Collection<byte[]> getTxChanges() {
-      checked = true;
-      if (failChangesTxOnce == InduceFailure.ThrowException) {
-        failChangesTxOnce = InduceFailure.NoFailure;
-        throw new RuntimeException("changes failure");
-      }
-      return ImmutableList.copyOf(changes);
-    }
-
-    @Override
-    public boolean commitTx() throws Exception {
-      committed = true;
-      if (failCommitTxOnce == InduceFailure.ThrowException) {
-        failCommitTxOnce = InduceFailure.NoFailure;
-        throw new RuntimeException("persist failure");
-      }
-      if (failCommitTxOnce == InduceFailure.ReturnFalse) {
-        failCommitTxOnce = InduceFailure.NoFailure;
-        return false;
-      }
-      return true;
-    }
-
-    @Override
-    public void postTxCommit() {
-      postCommitted = true;
-      if (failPostCommitTxOnce == InduceFailure.ThrowException) {
-        failPostCommitTxOnce = InduceFailure.NoFailure;
-        throw new RuntimeException("post failure");
-      }
-    }
-
-    @Override
-    public boolean rollbackTx() throws Exception {
-      rolledBack = true;
-      if (failRollbackTxOnce == InduceFailure.ThrowException) {
-        failRollbackTxOnce = InduceFailure.NoFailure;
-        throw new RuntimeException("rollback failure");
-      }
-      if (failRollbackTxOnce == InduceFailure.ReturnFalse) {
-        failRollbackTxOnce = InduceFailure.NoFailure;
-        return false;
-      }
-      return true;
-    }
-
-    @Override
-    public String getTransactionAwareName() {
-      return "dummy";
-    }
-  }
-
-  static class DummyTxClient extends InMemoryTxSystemClient {
-
-    boolean failCanCommitOnce = false;
-    int failCommits = 0;
-    enum CommitState {
-      Started, Committed, Aborted, Invalidated
-    }
-    CommitState state = CommitState.Started;
-
-    @Inject
-    DummyTxClient(TransactionManager txmgr) {
-      super(txmgr);
-    }
-
-    @Override
-    public boolean canCommit(Transaction tx, Collection<byte[]> changeIds) throws TransactionNotInProgressException {
-      if (failCanCommitOnce) {
-        failCanCommitOnce = false;
-        return false;
-      } else {
-        return super.canCommit(tx, changeIds);
-      }
-    }
-
-    @Override
-    public boolean commit(Transaction tx) throws TransactionNotInProgressException {
-      if (failCommits-- > 0) {
-        return false;
-      } else {
-        state = CommitState.Committed;
-        return super.commit(tx);
-      }
-    }
-
-    @Override
-    public Transaction startLong() {
-      state = CommitState.Started;
-      return super.startLong();
-    }
-
-    @Override
-    public Transaction startShort() {
-      state = CommitState.Started;
-      return super.startShort();
-    }
-
-    @Override
-    public Transaction startShort(int timeout) {
-      state = CommitState.Started;
-      return super.startShort(timeout);
-    }
-
-    @Override
-    public void abort(Transaction tx) {
-      state = CommitState.Aborted;
-      super.abort(tx);
-    }
-
-    @Override
-    public boolean invalidate(long tx) {
-      state = CommitState.Invalidated;
-      return super.invalidate(tx);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/TransactionExecutorTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/co/cask/tephra/TransactionExecutorTest.java b/tephra-core/src/test/java/co/cask/tephra/TransactionExecutorTest.java
deleted file mode 100644
index 7e67f2c..0000000
--- a/tephra-core/src/test/java/co/cask/tephra/TransactionExecutorTest.java
+++ /dev/null
@@ -1,590 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra;
-
-import co.cask.tephra.inmemory.InMemoryTxSystemClient;
-import co.cask.tephra.runtime.ConfigModule;
-import co.cask.tephra.runtime.DiscoveryModules;
-import co.cask.tephra.runtime.TransactionModules;
-import co.cask.tephra.snapshot.DefaultSnapshotCodec;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.inject.AbstractModule;
-import com.google.inject.Guice;
-import com.google.inject.Inject;
-import com.google.inject.Injector;
-import com.google.inject.Singleton;
-import com.google.inject.util.Modules;
-import org.apache.hadoop.conf.Configuration;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-import javax.annotation.Nullable;
-
-/**
- * Tests the transaction executor.
- */
-public class TransactionExecutorTest {
-  private static DummyTxClient txClient;
-  private static TransactionExecutorFactory factory;
-
-  @ClassRule
-  public static TemporaryFolder tmpFolder = new TemporaryFolder();
-
-  @BeforeClass
-  public static void setup() throws IOException {
-    final Configuration conf = new Configuration();
-    conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, DefaultSnapshotCodec.class.getName());
-    conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath());
-    Injector injector = Guice.createInjector(
-      new ConfigModule(conf),
-      new DiscoveryModules().getInMemoryModules(),
-      Modules.override(
-        new TransactionModules().getInMemoryModules()).with(new AbstractModule() {
-        @Override
-        protected void configure() {
-          TransactionManager txManager = new TransactionManager(conf);
-          txManager.startAndWait();
-          bind(TransactionManager.class).toInstance(txManager);
-          bind(TransactionSystemClient.class).to(DummyTxClient.class).in(Singleton.class);
-        }
-      }));
-
-    txClient = (DummyTxClient) injector.getInstance(TransactionSystemClient.class);
-    factory = injector.getInstance(TransactionExecutorFactory.class);
-  }
-
-  final DummyTxAware ds1 = new DummyTxAware(), ds2 = new DummyTxAware();
-  final Collection<TransactionAware> txAwares = ImmutableList.<TransactionAware>of(ds1, ds2);
-
-  private TransactionExecutor getExecutor() {
-    return factory.createExecutor(txAwares);
-  }
-
-  private TransactionExecutor getExecutorWithNoRetry() {
-    return new DefaultTransactionExecutor(txClient, txAwares, RetryStrategies.noRetries());
-  }
-
-  static final byte[] A = { 'a' };
-  static final byte[] B = { 'b' };
-
-  final TransactionExecutor.Function<Integer, Integer> testFunction =
-    new TransactionExecutor.Function<Integer, Integer>() {
-      @Override
-      public Integer apply(@Nullable Integer input) {
-        ds1.addChange(A);
-        ds2.addChange(B);
-        if (input == null) {
-          throw new RuntimeException("function failed");
-        }
-        return input * input;
-      }
-  };
-
-  @Before
-  public void resetTxAwares() {
-    ds1.reset();
-    ds2.reset();
-  }
-
-  @Test
-  public void testSuccessful() throws TransactionFailureException, InterruptedException {
-    // execute: add a change to ds1 and ds2
-    Integer result = getExecutor().execute(testFunction, 10);
-    // verify both are committed and post-committed
-    Assert.assertTrue(ds1.started);
-    Assert.assertTrue(ds2.started);
-    Assert.assertTrue(ds1.checked);
-    Assert.assertTrue(ds2.checked);
-    Assert.assertTrue(ds1.committed);
-    Assert.assertTrue(ds2.committed);
-    Assert.assertTrue(ds1.postCommitted);
-    Assert.assertTrue(ds2.postCommitted);
-    Assert.assertFalse(ds1.rolledBack);
-    Assert.assertFalse(ds2.rolledBack);
-    Assert.assertTrue(100 == result);
-    Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Committed);
-  }
-
-  @Test
-  public void testPostCommitFailure() throws TransactionFailureException, InterruptedException {
-    ds1.failPostCommitTxOnce = InduceFailure.ThrowException;
-    // execute: add a change to ds1 and ds2
-    try {
-      getExecutor().execute(testFunction, 10);
-      Assert.fail("post commit failed - exception should be thrown");
-    } catch (TransactionFailureException e) {
-      Assert.assertEquals("post failure", e.getCause().getMessage());
-    }
-    // verify both are committed and post-committed
-    Assert.assertTrue(ds1.started);
-    Assert.assertTrue(ds2.started);
-    Assert.assertTrue(ds1.checked);
-    Assert.assertTrue(ds2.checked);
-    Assert.assertTrue(ds1.committed);
-    Assert.assertTrue(ds2.committed);
-    Assert.assertTrue(ds1.postCommitted);
-    Assert.assertTrue(ds2.postCommitted);
-    Assert.assertFalse(ds1.rolledBack);
-    Assert.assertFalse(ds2.rolledBack);
-    Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Committed);
-  }
-
-  @Test
-  public void testPersistFailure() throws TransactionFailureException, InterruptedException {
-    ds1.failCommitTxOnce = InduceFailure.ThrowException;
-    // execute: add a change to ds1 and ds2
-    try {
-      getExecutor().execute(testFunction, 10);
-      Assert.fail("persist failed - exception should be thrown");
-    } catch (TransactionFailureException e) {
-      Assert.assertEquals("persist failure", e.getCause().getMessage());
-    }
-    // verify both are rolled back and tx is aborted
-    Assert.assertTrue(ds1.started);
-    Assert.assertTrue(ds2.started);
-    Assert.assertTrue(ds1.checked);
-    Assert.assertTrue(ds2.checked);
-    Assert.assertTrue(ds1.committed);
-    Assert.assertFalse(ds2.committed);
-    Assert.assertFalse(ds1.postCommitted);
-    Assert.assertFalse(ds2.postCommitted);
-    Assert.assertTrue(ds1.rolledBack);
-    Assert.assertTrue(ds2.rolledBack);
-    Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted);
-  }
-
-  @Test
-  public void testPersistFalse() throws TransactionFailureException, InterruptedException {
-    ds1.failCommitTxOnce = InduceFailure.ReturnFalse;
-    // execute: add a change to ds1 and ds2
-    try {
-      getExecutor().execute(testFunction, 10);
-      Assert.fail("persist failed - exception should be thrown");
-    } catch (TransactionFailureException e) {
-      Assert.assertNull(e.getCause()); // in this case, the ds simply returned false
-    }
-    // verify both are rolled back and tx is aborted
-    Assert.assertTrue(ds1.started);
-    Assert.assertTrue(ds2.started);
-    Assert.assertTrue(ds1.checked);
-    Assert.assertTrue(ds2.checked);
-    Assert.assertTrue(ds1.committed);
-    Assert.assertFalse(ds2.committed);
-    Assert.assertFalse(ds1.postCommitted);
-    Assert.assertFalse(ds2.postCommitted);
-    Assert.assertTrue(ds1.rolledBack);
-    Assert.assertTrue(ds2.rolledBack);
-    Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted);
-  }
-
-  @Test
-  public void testPersistAndRollbackFailure() throws TransactionFailureException, InterruptedException {
-    ds1.failCommitTxOnce = InduceFailure.ThrowException;
-    ds1.failRollbackTxOnce = InduceFailure.ThrowException;
-    // execute: add a change to ds1 and ds2
-    try {
-      getExecutor().execute(testFunction, 10);
-      Assert.fail("persist failed - exception should be thrown");
-    } catch (TransactionFailureException e) {
-      Assert.assertEquals("persist failure", e.getCause().getMessage());
-    }
-    // verify both are rolled back and tx is invalidated
-    Assert.assertTrue(ds1.started);
-    Assert.assertTrue(ds2.started);
-    Assert.assertTrue(ds1.checked);
-    Assert.assertTrue(ds2.checked);
-    Assert.assertTrue(ds1.committed);
-    Assert.assertFalse(ds2.committed);
-    Assert.assertFalse(ds1.postCommitted);
-    Assert.assertFalse(ds2.postCommitted);
-    Assert.assertTrue(ds1.rolledBack);
-    Assert.assertTrue(ds2.rolledBack);
-    Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Invalidated);
-  }
-
-  @Test
-  public void testPersistAndRollbackFalse() throws TransactionFailureException, InterruptedException {
-    ds1.failCommitTxOnce = InduceFailure.ReturnFalse;
-    ds1.failRollbackTxOnce = InduceFailure.ReturnFalse;
-    // execute: add a change to ds1 and ds2
-    try {
-      getExecutor().execute(testFunction, 10);
-      Assert.fail("persist failed - exception should be thrown");
-    } catch (TransactionFailureException e) {
-      Assert.assertNull(e.getCause()); // in this case, the ds simply returned false
-    }
-    // verify both are rolled back and tx is invalidated
-    Assert.assertTrue(ds1.started);
-    Assert.assertTrue(ds2.started);
-    Assert.assertTrue(ds1.checked);
-    Assert.assertTrue(ds2.checked);
-    Assert.assertTrue(ds1.committed);
-    Assert.assertFalse(ds2.committed);
-    Assert.assertFalse(ds1.postCommitted);
-    Assert.assertFalse(ds2.postCommitted);
-    Assert.assertTrue(ds1.rolledBack);
-    Assert.assertTrue(ds2.rolledBack);
-    Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Invalidated);
-  }
-
-  @Test
-  public void testNoIndefiniteRetryByDefault() throws TransactionFailureException, InterruptedException {
-    // we want retry by default, so that engineers don't miss it
-    txClient.failCommits = 1000;
-    try {
-      // execute: add a change to ds1 and ds2
-      getExecutor().execute(testFunction, 10);
-      Assert.fail("commit failed too many times to retry - exception should be thrown");
-    } catch (TransactionConflictException e) {
-      Assert.assertNull(e.getCause());
-    }
-
-    txClient.failCommits = 0;
-
-    // verify both are rolled back and tx is aborted
-    Assert.assertTrue(ds1.started);
-    Assert.assertTrue(ds2.started);
-    Assert.assertTrue(ds1.checked);
-    Assert.assertTrue(ds2.checked);
-    Assert.assertTrue(ds1.committed);
-    Assert.assertTrue(ds2.committed);
-    Assert.assertFalse(ds1.postCommitted);
-    Assert.assertFalse(ds2.postCommitted);
-    Assert.assertTrue(ds1.rolledBack);
-    Assert.assertTrue(ds2.rolledBack);
-    Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted);
-  }
-
-  @Test
-  public void testRetryByDefault() throws TransactionFailureException, InterruptedException {
-    // we want retry by default, so that engineers don't miss it
-    txClient.failCommits = 2;
-    // execute: add a change to ds1 and ds2
-    getExecutor().execute(testFunction, 10);
-    // should not fail, but continue
-
-    // verify both are committed
-    Assert.assertTrue(ds1.started);
-    Assert.assertTrue(ds2.started);
-    Assert.assertTrue(ds1.checked);
-    Assert.assertTrue(ds2.checked);
-    Assert.assertTrue(ds1.committed);
-    Assert.assertTrue(ds2.committed);
-    Assert.assertTrue(ds1.postCommitted);
-    Assert.assertTrue(ds2.postCommitted);
-    Assert.assertFalse(ds1.rolledBack);
-    Assert.assertFalse(ds2.rolledBack);
-    Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Committed);
-  }
-
-  @Test
-  public void testCommitFalse() throws TransactionFailureException, InterruptedException {
-    txClient.failCommits = 1;
-    // execute: add a change to ds1 and ds2
-    try {
-      getExecutorWithNoRetry().execute(testFunction, 10);
-      Assert.fail("commit failed - exception should be thrown");
-    } catch (TransactionConflictException e) {
-      Assert.assertNull(e.getCause());
-    }
-    // verify both are rolled back and tx is aborted
-    Assert.assertTrue(ds1.started);
-    Assert.assertTrue(ds2.started);
-    Assert.assertTrue(ds1.checked);
-    Assert.assertTrue(ds2.checked);
-    Assert.assertTrue(ds1.committed);
-    Assert.assertTrue(ds2.committed);
-    Assert.assertFalse(ds1.postCommitted);
-    Assert.assertFalse(ds2.postCommitted);
-    Assert.assertTrue(ds1.rolledBack);
-    Assert.assertTrue(ds2.rolledBack);
-    Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted);
-  }
-
-  @Test
-  public void testCanCommitFalse() throws TransactionFailureException, InterruptedException {
-    txClient.failCanCommitOnce = true;
-    // execute: add a change to ds1 and ds2
-    try {
-      getExecutorWithNoRetry().execute(testFunction, 10);
-      Assert.fail("commit failed - exception should be thrown");
-    } catch (TransactionConflictException e) {
-      Assert.assertNull(e.getCause());
-    }
-    // verify both are rolled back and tx is aborted
-    Assert.assertTrue(ds1.started);
-    Assert.assertTrue(ds2.started);
-    Assert.assertTrue(ds1.checked);
-    Assert.assertTrue(ds2.checked);
-    Assert.assertFalse(ds1.committed);
-    Assert.assertFalse(ds2.committed);
-    Assert.assertFalse(ds1.postCommitted);
-    Assert.assertFalse(ds2.postCommitted);
-    Assert.assertTrue(ds1.rolledBack);
-    Assert.assertTrue(ds2.rolledBack);
-    Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted);
-  }
-
-  @Test
-  public void testChangesAndRollbackFailure() throws TransactionFailureException, InterruptedException {
-    ds1.failChangesTxOnce = InduceFailure.ThrowException;
-    ds1.failRollbackTxOnce = InduceFailure.ThrowException;
-    // execute: add a change to ds1 and ds2
-    try {
-      getExecutor().execute(testFunction, 10);
-      Assert.fail("get changes failed - exception should be thrown");
-    } catch (TransactionFailureException e) {
-      Assert.assertEquals("changes failure", e.getCause().getMessage());
-    }
-    // verify both are rolled back and tx is invalidated
-    Assert.assertTrue(ds1.started);
-    Assert.assertTrue(ds2.started);
-    Assert.assertTrue(ds1.checked);
-    Assert.assertFalse(ds2.checked);
-    Assert.assertFalse(ds1.committed);
-    Assert.assertFalse(ds2.committed);
-    Assert.assertFalse(ds1.postCommitted);
-    Assert.assertFalse(ds2.postCommitted);
-    Assert.assertTrue(ds1.rolledBack);
-    Assert.assertTrue(ds2.rolledBack);
-    Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Invalidated);
-  }
-
-  @Test
-  public void testFunctionAndRollbackFailure() throws TransactionFailureException, InterruptedException {
-    ds1.failRollbackTxOnce = InduceFailure.ReturnFalse;
-    // execute: add a change to ds1 and ds2
-    try {
-      getExecutor().execute(testFunction, null);
-      Assert.fail("function failed - exception should be thrown");
-    } catch (TransactionFailureException e) {
-      Assert.assertEquals("function failed", e.getCause().getMessage());
-    }
-    // verify both are rolled back and tx is invalidated
-    Assert.assertTrue(ds1.started);
-    Assert.assertTrue(ds2.started);
-    Assert.assertFalse(ds1.checked);
-    Assert.assertFalse(ds2.checked);
-    Assert.assertFalse(ds1.committed);
-    Assert.assertFalse(ds2.committed);
-    Assert.assertFalse(ds1.postCommitted);
-    Assert.assertFalse(ds2.postCommitted);
-    Assert.assertTrue(ds1.rolledBack);
-    Assert.assertTrue(ds2.rolledBack);
-    Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Invalidated);
-  }
-
-  @Test
-  public void testStartAndRollbackFailure() throws TransactionFailureException, InterruptedException {
-    ds1.failStartTxOnce = InduceFailure.ThrowException;
-    // execute: add a change to ds1 and ds2
-    try {
-      getExecutor().execute(testFunction, 10);
-      Assert.fail("start failed - exception should be thrown");
-    } catch (TransactionFailureException e) {
-      Assert.assertEquals("start failure", e.getCause().getMessage());
-    }
-    // verify both are not rolled back and tx is aborted
-    Assert.assertTrue(ds1.started);
-    Assert.assertFalse(ds2.started);
-    Assert.assertFalse(ds1.checked);
-    Assert.assertFalse(ds2.checked);
-    Assert.assertFalse(ds1.committed);
-    Assert.assertFalse(ds2.committed);
-    Assert.assertFalse(ds1.postCommitted);
-    Assert.assertFalse(ds2.postCommitted);
-    Assert.assertFalse(ds1.rolledBack);
-    Assert.assertFalse(ds2.rolledBack);
-    Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted);
-  }
-
-  enum InduceFailure { NoFailure, ReturnFalse, ThrowException }
-
-  static class DummyTxAware implements TransactionAware {
-
-    Transaction tx;
-    boolean started = false;
-    boolean committed = false;
-    boolean checked = false;
-    boolean rolledBack = false;
-    boolean postCommitted = false;
-    List<byte[]> changes = Lists.newArrayList();
-
-    InduceFailure failStartTxOnce = InduceFailure.NoFailure;
-    InduceFailure failChangesTxOnce = InduceFailure.NoFailure;
-    InduceFailure failCommitTxOnce = InduceFailure.NoFailure;
-    InduceFailure failPostCommitTxOnce = InduceFailure.NoFailure;
-    InduceFailure failRollbackTxOnce = InduceFailure.NoFailure;
-
-    void addChange(byte[] key) {
-      changes.add(key);
-    }
-
-    void reset() {
-      tx = null;
-      started = false;
-      checked = false;
-      committed = false;
-      rolledBack = false;
-      postCommitted = false;
-      changes.clear();
-    }
-
-    @Override
-    public void startTx(Transaction tx) {
-      reset();
-      started = true;
-      this.tx = tx;
-      if (failStartTxOnce == InduceFailure.ThrowException) {
-        failStartTxOnce = InduceFailure.NoFailure;
-        throw new RuntimeException("start failure");
-      }
-    }
-
-    @Override
-    public void updateTx(Transaction tx) {
-      this.tx = tx;
-    }
-
-    @Override
-    public Collection<byte[]> getTxChanges() {
-      checked = true;
-      if (failChangesTxOnce == InduceFailure.ThrowException) {
-        failChangesTxOnce = InduceFailure.NoFailure;
-        throw new RuntimeException("changes failure");
-      }
-      return ImmutableList.copyOf(changes);
-    }
-
-    @Override
-    public boolean commitTx() throws Exception {
-      committed = true;
-      if (failCommitTxOnce == InduceFailure.ThrowException) {
-        failCommitTxOnce = InduceFailure.NoFailure;
-        throw new RuntimeException("persist failure");
-      }
-      if (failCommitTxOnce == InduceFailure.ReturnFalse) {
-        failCommitTxOnce = InduceFailure.NoFailure;
-        return false;
-      }
-      return true;
-    }
-
-    @Override
-    public void postTxCommit() {
-      postCommitted = true;
-      if (failPostCommitTxOnce == InduceFailure.ThrowException) {
-        failPostCommitTxOnce = InduceFailure.NoFailure;
-        throw new RuntimeException("post failure");
-      }
-    }
-
-    @Override
-    public boolean rollbackTx() throws Exception {
-      rolledBack = true;
-      if (failRollbackTxOnce == InduceFailure.ThrowException) {
-        failRollbackTxOnce = InduceFailure.NoFailure;
-        throw new RuntimeException("rollback failure");
-      }
-      if (failRollbackTxOnce == InduceFailure.ReturnFalse) {
-        failRollbackTxOnce = InduceFailure.NoFailure;
-        return false;
-      }
-      return true;
-    }
-
-    @Override
-    public String getTransactionAwareName() {
-      return "dummy";
-    }
-  }
-
-  static class DummyTxClient extends InMemoryTxSystemClient {
-
-    boolean failCanCommitOnce = false;
-    int failCommits = 0;
-    enum CommitState {
-      Started, Committed, Aborted, Invalidated
-    }
-    CommitState state = CommitState.Started;
-
-    @Inject
-    DummyTxClient(TransactionManager txmgr) {
-      super(txmgr);
-    }
-
-    @Override
-    public boolean canCommit(Transaction tx, Collection<byte[]> changeIds) throws TransactionNotInProgressException {
-      if (failCanCommitOnce) {
-        failCanCommitOnce = false;
-        return false;
-      } else {
-        return super.canCommit(tx, changeIds);
-      }
-    }
-
-    @Override
-    public boolean commit(Transaction tx) throws TransactionNotInProgressException {
-      if (failCommits-- > 0) {
-        return false;
-      } else {
-        state = CommitState.Committed;
-        return super.commit(tx);
-      }
-    }
-
-    @Override
-    public Transaction startLong() {
-      state = CommitState.Started;
-      return super.startLong();
-    }
-
-    @Override
-    public Transaction startShort() {
-      state = CommitState.Started;
-      return super.startShort();
-    }
-
-    @Override
-    public Transaction startShort(int timeout) {
-      state = CommitState.Started;
-      return super.startShort(timeout);
-    }
-
-    @Override
-    public void abort(Transaction tx) {
-      state = CommitState.Aborted;
-      super.abort(tx);
-    }
-
-    @Override
-    public boolean invalidate(long tx) {
-      state = CommitState.Invalidated;
-      return super.invalidate(tx);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/TransactionManagerTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/co/cask/tephra/TransactionManagerTest.java b/tephra-core/src/test/java/co/cask/tephra/TransactionManagerTest.java
deleted file mode 100644
index ddd32db..0000000
--- a/tephra-core/src/test/java/co/cask/tephra/TransactionManagerTest.java
+++ /dev/null
@@ -1,346 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra;
-
-import co.cask.tephra.inmemory.InMemoryTxSystemClient;
-import co.cask.tephra.metrics.TxMetricsCollector;
-import co.cask.tephra.persist.InMemoryTransactionStateStorage;
-import co.cask.tephra.persist.TransactionStateStorage;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import org.apache.hadoop.conf.Configuration;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.Collections;
-import java.util.concurrent.TimeUnit;
-
-/**
- *
- */
-public class TransactionManagerTest extends TransactionSystemTest {
-
-  static Configuration conf = new Configuration();
-
-  TransactionManager txManager = null;
-  TransactionStateStorage txStateStorage = null;
-
-  @Override
-  protected TransactionSystemClient getClient() {
-    return new InMemoryTxSystemClient(txManager);
-  }
-
-  @Override
-  protected TransactionStateStorage getStateStorage() {
-    return txStateStorage;
-  }
-
-  @Before
-  public void before() {
-    conf.setInt(TxConstants.Manager.CFG_TX_CLEANUP_INTERVAL, 0); // no cleanup thread
-    // todo should create two sets of tests, one with LocalFileTxStateStorage and one with InMemoryTxStateStorage
-    txStateStorage = new InMemoryTransactionStateStorage();
-    txManager = new TransactionManager
-      (conf, txStateStorage, new TxMetricsCollector());
-    txManager.startAndWait();
-  }
-
-  @After
-  public void after() {
-    txManager.stopAndWait();
-  }
-
-  @Test
-  public void testTransactionCleanup() throws Exception {
-    conf.setInt(TxConstants.Manager.CFG_TX_CLEANUP_INTERVAL, 3);
-    conf.setInt(TxConstants.Manager.CFG_TX_TIMEOUT, 2);
-    // using a new tx manager that cleans up
-    TransactionManager txm = new TransactionManager
-      (conf, new InMemoryTransactionStateStorage(), new TxMetricsCollector());
-    txm.startAndWait();
-    try {
-      Assert.assertEquals(0, txm.getInvalidSize());
-      Assert.assertEquals(0, txm.getCommittedSize());
-      // start a transaction and leave it open
-      Transaction tx1 = txm.startShort();
-      // start a long running transaction and leave it open
-      Transaction tx2 = txm.startLong();
-      Transaction tx3 = txm.startLong();
-      // start and commit a bunch of transactions
-      for (int i = 0; i < 10; i++) {
-        Transaction tx = txm.startShort();
-        Assert.assertTrue(txm.canCommit(tx, Collections.singleton(new byte[] { (byte) i })));
-        Assert.assertTrue(txm.commit(tx));
-      }
-      // all of these should still be in the committed set
-      Assert.assertEquals(0, txm.getInvalidSize());
-      Assert.assertEquals(10, txm.getCommittedSize());
-      // sleep longer than the cleanup interval
-      TimeUnit.SECONDS.sleep(5);
-      // transaction should now be invalid
-      Assert.assertEquals(1, txm.getInvalidSize());
-      // run another transaction
-      Transaction txx = txm.startShort();
-      // verify the exclude
-      Assert.assertFalse(txx.isVisible(tx1.getTransactionId()));
-      Assert.assertFalse(txx.isVisible(tx2.getTransactionId()));
-      Assert.assertFalse(txx.isVisible(tx3.getTransactionId()));
-      // try to commit the last transaction that was started
-      Assert.assertTrue(txm.canCommit(txx, Collections.singleton(new byte[] { 0x0a })));
-      Assert.assertTrue(txm.commit(txx));
-
-      // now the committed change sets should be empty again
-      Assert.assertEquals(0, txm.getCommittedSize());
-      // cannot commit transaction as it was timed out
-      try {
-        txm.canCommit(tx1, Collections.singleton(new byte[] { 0x11 }));
-        Assert.fail();
-      } catch (TransactionNotInProgressException e) {
-        // expected
-      }
-      txm.abort(tx1);
-      // abort should have removed from invalid
-      Assert.assertEquals(0, txm.getInvalidSize());
-      // run another bunch of transactions
-      for (int i = 0; i < 10; i++) {
-        Transaction tx = txm.startShort();
-        Assert.assertTrue(txm.canCommit(tx, Collections.singleton(new byte[] { (byte) i })));
-        Assert.assertTrue(txm.commit(tx));
-      }
-      // none of these should still be in the committed set (tx2 is long-running).
-      Assert.assertEquals(0, txm.getInvalidSize());
-      Assert.assertEquals(0, txm.getCommittedSize());
-      // commit tx2, abort tx3
-      Assert.assertTrue(txm.commit(tx2));
-      txm.abort(tx3);
-      // none of these should still be in the committed set (tx2 is long-running).
-      // Only tx3 is invalid list as it was aborted and is long-running. tx1 is short one and it rolled back its changes
-      // so it should NOT be in invalid list
-      Assert.assertEquals(1, txm.getInvalidSize());
-      Assert.assertEquals(tx3.getTransactionId(), (long) txm.getCurrentState().getInvalid().iterator().next());
-      Assert.assertEquals(1, txm.getExcludedListSize());
-    } finally {
-      txm.stopAndWait();
-    }
-  }
-
-  @Test
-  public void testLongTransactionCleanup() throws Exception {
-    conf.setInt(TxConstants.Manager.CFG_TX_CLEANUP_INTERVAL, 3);
-    conf.setInt(TxConstants.Manager.CFG_TX_LONG_TIMEOUT, 2);
-    // using a new tx manager that cleans up
-    TransactionManager txm = new TransactionManager
-      (conf, new InMemoryTransactionStateStorage(), new TxMetricsCollector());
-    txm.startAndWait();
-    try {
-      Assert.assertEquals(0, txm.getInvalidSize());
-      Assert.assertEquals(0, txm.getCommittedSize());
-      
-      // start a long running transaction
-      Transaction tx1 = txm.startLong();
-      
-      Assert.assertEquals(0, txm.getInvalidSize());
-      Assert.assertEquals(0, txm.getCommittedSize());
-
-      // sleep longer than the cleanup interval
-      TimeUnit.SECONDS.sleep(5);
-
-      // transaction should now be invalid
-      Assert.assertEquals(1, txm.getInvalidSize());
-      Assert.assertEquals(0, txm.getCommittedSize());
-
-      // cannot commit transaction as it was timed out
-      try {
-        txm.canCommit(tx1, Collections.singleton(new byte[] { 0x11 }));
-        Assert.fail();
-      } catch (TransactionNotInProgressException e) {
-        // expected
-      }
-      
-      txm.abort(tx1);
-      // abort should not remove long running transaction from invalid list
-      Assert.assertEquals(1, txm.getInvalidSize());
-    } finally {
-      txm.stopAndWait();
-    }
-  }
-  
-  @Test
-  public void testTruncateInvalid() throws Exception {
-    InMemoryTransactionStateStorage storage = new InMemoryTransactionStateStorage();
-    Configuration testConf = new Configuration(conf);
-    // No snapshots
-    testConf.setLong(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, -1);
-    TransactionManager txm1 = new TransactionManager(testConf, storage, new TxMetricsCollector());
-    txm1.startAndWait();
-
-    TransactionManager txm2 = null;
-    Transaction tx1;
-    Transaction tx2;
-    Transaction tx3;
-    Transaction tx4;
-    Transaction tx5;
-    Transaction tx6;
-    try {
-      Assert.assertEquals(0, txm1.getInvalidSize());
-
-      // start a few transactions
-      tx1 = txm1.startLong();
-      tx2 = txm1.startShort();
-      tx3 = txm1.startLong();
-      tx4 = txm1.startShort();
-      tx5 = txm1.startLong();
-      tx6 = txm1.startShort();
-
-      // invalidate tx1, tx2, tx5 and tx6
-      txm1.invalidate(tx1.getTransactionId());
-      txm1.invalidate(tx2.getTransactionId());
-      txm1.invalidate(tx5.getTransactionId());
-      txm1.invalidate(tx6.getTransactionId());
-
-      // tx1, tx2, tx5 and tx6 should be in invalid list
-      Assert.assertEquals(
-        ImmutableList.of(tx1.getTransactionId(), tx2.getTransactionId(), tx5.getTransactionId(),
-            tx6.getTransactionId()),
-        txm1.getCurrentState().getInvalid()
-      );
-      
-      // remove tx1 and tx6 from invalid list
-      Assert.assertTrue(txm1.truncateInvalidTx(ImmutableSet.of(tx1.getTransactionId(), tx6.getTransactionId())));
-      
-      // only tx2 and tx5 should be in invalid list now
-      Assert.assertEquals(ImmutableList.of(tx2.getTransactionId(), tx5.getTransactionId()),
-                          txm1.getCurrentState().getInvalid());
-      
-      // removing in-progress transactions should not have any effect
-      Assert.assertEquals(ImmutableSet.of(tx3.getTransactionId(), tx4.getTransactionId()),
-                          txm1.getCurrentState().getInProgress().keySet());
-      Assert.assertFalse(txm1.truncateInvalidTx(ImmutableSet.of(tx3.getTransactionId(), tx4.getTransactionId())));
-      // no change to in-progress
-      Assert.assertEquals(ImmutableSet.of(tx3.getTransactionId(), tx4.getTransactionId()),
-                          txm1.getCurrentState().getInProgress().keySet());
-      // no change to invalid list
-      Assert.assertEquals(ImmutableList.of(tx2.getTransactionId(), tx5.getTransactionId()),
-                          txm1.getCurrentState().getInvalid());
-
-      // Test transaction edit logs replay
-      // Start another transaction manager without stopping txm1 so that snapshot does not get written,
-      // and all logs can be replayed.
-      txm2 = new TransactionManager(testConf, storage, new TxMetricsCollector());
-      txm2.startAndWait();
-      Assert.assertEquals(ImmutableList.of(tx2.getTransactionId(), tx5.getTransactionId()),
-                          txm2.getCurrentState().getInvalid());
-      Assert.assertEquals(ImmutableSet.of(tx3.getTransactionId(), tx4.getTransactionId()),
-                          txm2.getCurrentState().getInProgress().keySet());
-    } finally {
-      txm1.stopAndWait();
-      if (txm2 != null) {
-        txm2.stopAndWait();
-      }
-    }
-  }
-
-  @Test
-  public void testTruncateInvalidBeforeTime() throws Exception {
-    InMemoryTransactionStateStorage storage = new InMemoryTransactionStateStorage();
-    Configuration testConf = new Configuration(conf);
-    // No snapshots
-    testConf.setLong(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, -1);
-    TransactionManager txm1 = new TransactionManager(testConf, storage, new TxMetricsCollector());
-    txm1.startAndWait();
-
-    TransactionManager txm2 = null;
-    Transaction tx1;
-    Transaction tx2;
-    Transaction tx3;
-    Transaction tx4;
-    Transaction tx5;
-    Transaction tx6;
-    try {
-      Assert.assertEquals(0, txm1.getInvalidSize());
-
-      // start a few transactions
-      tx1 = txm1.startLong();
-      tx2 = txm1.startShort();
-      // Sleep so that transaction ids get generated a millisecond apart for assertion
-      // TEPHRA-63 should eliminate the need to sleep
-      TimeUnit.MILLISECONDS.sleep(1);
-      long timeBeforeTx3 = System.currentTimeMillis();
-      tx3 = txm1.startLong();
-      tx4 = txm1.startShort();
-      TimeUnit.MILLISECONDS.sleep(1);
-      long timeBeforeTx5 = System.currentTimeMillis();
-      tx5 = txm1.startLong();
-      tx6 = txm1.startShort();
-
-      // invalidate tx1, tx2, tx5 and tx6
-      txm1.invalidate(tx1.getTransactionId());
-      txm1.invalidate(tx2.getTransactionId());
-      txm1.invalidate(tx5.getTransactionId());
-      txm1.invalidate(tx6.getTransactionId());
-
-      // tx1, tx2, tx5 and tx6 should be in invalid list
-      Assert.assertEquals(
-        ImmutableList.of(tx1.getTransactionId(), tx2.getTransactionId(), tx5.getTransactionId(),
-            tx6.getTransactionId()),
-        txm1.getCurrentState().getInvalid()
-      );
-
-      // remove transactions before tx3 from invalid list
-      Assert.assertTrue(txm1.truncateInvalidTxBefore(timeBeforeTx3));
-
-      // only tx5 and tx6 should be in invalid list now
-      Assert.assertEquals(ImmutableList.of(tx5.getTransactionId(), tx6.getTransactionId()),
-                          txm1.getCurrentState().getInvalid());
-
-      // removing invalid transactions before tx5 should throw exception since tx3 and tx4 are in-progress
-      Assert.assertEquals(ImmutableSet.of(tx3.getTransactionId(), tx4.getTransactionId()),
-                          txm1.getCurrentState().getInProgress().keySet());
-      try {
-        txm1.truncateInvalidTxBefore(timeBeforeTx5);
-        Assert.fail("Expected InvalidTruncateTimeException exception");
-      } catch (InvalidTruncateTimeException e) {
-        // Expected exception
-      }
-      // no change to in-progress
-      Assert.assertEquals(ImmutableSet.of(tx3.getTransactionId(), tx4.getTransactionId()),
-                          txm1.getCurrentState().getInProgress().keySet());
-      // no change to invalid list
-      Assert.assertEquals(ImmutableList.of(tx5.getTransactionId(), tx6.getTransactionId()),
-                          txm1.getCurrentState().getInvalid());
-
-      // Test transaction edit logs replay
-      // Start another transaction manager without stopping txm1 so that snapshot does not get written, 
-      // and all logs can be replayed.
-      txm2 = new TransactionManager(testConf, storage, new TxMetricsCollector());
-      txm2.startAndWait();
-      Assert.assertEquals(ImmutableList.of(tx5.getTransactionId(), tx6.getTransactionId()),
-                          txm2.getCurrentState().getInvalid());
-      Assert.assertEquals(ImmutableSet.of(tx3.getTransactionId(), tx4.getTransactionId()),
-                          txm2.getCurrentState().getInProgress().keySet());
-    } finally {
-      txm1.stopAndWait();
-      if (txm2 != null) {
-        txm2.stopAndWait();
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/TransactionServiceMainTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/co/cask/tephra/TransactionServiceMainTest.java b/tephra-core/src/test/java/co/cask/tephra/TransactionServiceMainTest.java
deleted file mode 100644
index e54956d..0000000
--- a/tephra-core/src/test/java/co/cask/tephra/TransactionServiceMainTest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra;
-
-import co.cask.tephra.distributed.TransactionServiceClient;
-import com.google.common.base.Throwables;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.twill.internal.zookeeper.InMemoryZKServer;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.util.concurrent.CountDownLatch;
-
-/**
- * Test for verifying TransactionServiceMain works correctly.
- */
-public class TransactionServiceMainTest {
-
-  @ClassRule
-  public static TemporaryFolder tmpFolder = new TemporaryFolder();
-
-  @Test
-  public void testClientServer() throws Exception {
-    // Simply start a transaction server and connect to it with the client.
-    InMemoryZKServer zkServer = InMemoryZKServer.builder().setDataDir(tmpFolder.newFolder()).build();
-    zkServer.startAndWait();
-
-    try {
-      Configuration conf = new Configuration();
-      conf.set(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM, zkServer.getConnectionStr());
-      conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath());
-
-      final TransactionServiceMain main = new TransactionServiceMain(conf);
-      final CountDownLatch latch = new CountDownLatch(1);
-      Thread t = new Thread() {
-        @Override
-        public void run() {
-          try {
-            main.start();
-            latch.countDown();
-          } catch (Exception e) {
-            throw Throwables.propagate(e);
-          }
-        }
-      };
-
-      try {
-        t.start();
-        // Wait for service to startup
-        latch.await();
-        TransactionServiceClient.doMain(true, conf);
-      } finally {
-        main.stop();
-        t.join();
-      }
-    } finally {
-      zkServer.stopAndWait();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/TransactionSystemTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/co/cask/tephra/TransactionSystemTest.java b/tephra-core/src/test/java/co/cask/tephra/TransactionSystemTest.java
deleted file mode 100644
index cca6f5a..0000000
--- a/tephra-core/src/test/java/co/cask/tephra/TransactionSystemTest.java
+++ /dev/null
@@ -1,320 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra;
-
-import co.cask.tephra.persist.TransactionSnapshot;
-import co.cask.tephra.persist.TransactionStateStorage;
-import com.google.common.collect.ImmutableSet;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.concurrent.TimeUnit;
-
-/**
- *
- */
-public abstract class TransactionSystemTest {
-
-  public static final byte[] C1 = new byte[] { 'c', '1' };
-  public static final byte[] C2 = new byte[] { 'c', '2' };
-  public static final byte[] C3 = new byte[] { 'c', '3' };
-  public static final byte[] C4 = new byte[] { 'c', '4' };
-
-  protected abstract TransactionSystemClient getClient() throws Exception;
-
-  protected abstract TransactionStateStorage getStateStorage() throws Exception;
-
-  @Test
-  public void testCommitRaceHandling() throws Exception {
-    TransactionSystemClient client1 = getClient();
-    TransactionSystemClient client2 = getClient();
-
-    Transaction tx1 = client1.startShort();
-    Transaction tx2 = client2.startShort();
-
-    Assert.assertTrue(client1.canCommit(tx1, asList(C1, C2)));
-    // second one also can commit even thought there are conflicts with first since first one hasn't committed yet
-    Assert.assertTrue(client2.canCommit(tx2, asList(C2, C3)));
-
-    Assert.assertTrue(client1.commit(tx1));
-
-    // now second one should not commit, since there are conflicts with tx1 that has been committed
-    Assert.assertFalse(client2.commit(tx2));
-  }
-
-  @Test
-  public void testMultipleCommitsAtSameTime() throws Exception {
-    // We want to check that if two txs finish at same time (wrt tx manager) they do not overwrite changesets of each
-    // other in tx manager used for conflicts detection (we had this bug)
-    // NOTE: you don't have to use multiple clients for that
-    TransactionSystemClient client1 = getClient();
-    TransactionSystemClient client2 = getClient();
-    TransactionSystemClient client3 = getClient();
-    TransactionSystemClient client4 = getClient();
-    TransactionSystemClient client5 = getClient();
-
-    Transaction tx1 = client1.startShort();
-    Transaction tx2 = client2.startShort();
-    Transaction tx3 = client3.startShort();
-    Transaction tx4 = client4.startShort();
-    Transaction tx5 = client5.startShort();
-
-    Assert.assertTrue(client1.canCommit(tx1, asList(C1)));
-    Assert.assertTrue(client1.commit(tx1));
-
-    Assert.assertTrue(client2.canCommit(tx2, asList(C2)));
-    Assert.assertTrue(client2.commit(tx2));
-
-    // verifying conflicts detection
-    Assert.assertFalse(client3.canCommit(tx3, asList(C1)));
-    Assert.assertFalse(client4.canCommit(tx4, asList(C2)));
-    Assert.assertTrue(client5.canCommit(tx5, asList(C3)));
-  }
-
-  @Test
-  public void testCommitTwice() throws Exception {
-    TransactionSystemClient client = getClient();
-    Transaction tx = client.startShort();
-
-    Assert.assertTrue(client.canCommit(tx, asList(C1, C2)));
-    Assert.assertTrue(client.commit(tx));
-    // cannot commit twice same tx
-    try {
-      Assert.assertFalse(client.commit(tx));
-      Assert.fail();
-    } catch (TransactionNotInProgressException e) {
-      // expected
-    }
-  }
-
-  @Test
-  public void testAbortTwice() throws Exception {
-    TransactionSystemClient client = getClient();
-    Transaction tx = client.startShort();
-
-    Assert.assertTrue(client.canCommit(tx, asList(C1, C2)));
-    client.abort(tx);
-    // abort of not active tx has no affect
-    client.abort(tx);
-  }
-
-  @Test
-  public void testReuseTx() throws Exception {
-    TransactionSystemClient client = getClient();
-    Transaction tx = client.startShort();
-
-    Assert.assertTrue(client.canCommit(tx, asList(C1, C2)));
-    Assert.assertTrue(client.commit(tx));
-    // can't re-use same tx again
-    try {
-      client.canCommit(tx, asList(C3, C4));
-      Assert.fail();
-    } catch (TransactionNotInProgressException e) {
-      // expected
-    }
-    try {
-      Assert.assertFalse(client.commit(tx));
-      Assert.fail();
-    } catch (TransactionNotInProgressException e) {
-      // expected
-    }
-
-    // abort of not active tx has no affect
-    client.abort(tx);
-  }
-
-  @Test
-  public void testUseNotStarted() throws Exception {
-    TransactionSystemClient client = getClient();
-    Transaction tx1 = client.startShort();
-    Assert.assertTrue(client.commit(tx1));
-
-    // we know this is one is older than current writePointer and was not used
-    Transaction txOld = new Transaction(tx1.getReadPointer(), tx1.getTransactionId() - 1,
-                                        new long[] {}, new long[] {}, Transaction.NO_TX_IN_PROGRESS, 
-                                        TransactionType.SHORT);
-    try {
-      Assert.assertFalse(client.canCommit(txOld, asList(C3, C4)));
-      Assert.fail();
-    } catch (TransactionNotInProgressException e) {
-      // expected
-    }
-    try {
-      Assert.assertFalse(client.commit(txOld));
-      Assert.fail();
-    } catch (TransactionNotInProgressException e) {
-      // expected
-    }
-    // abort of not active tx has no affect
-    client.abort(txOld);
-
-    // we know this is one is newer than current readPointer and was not used
-    Transaction txNew = new Transaction(tx1.getReadPointer(), tx1.getTransactionId() + 1,
-                                        new long[] {}, new long[] {}, Transaction.NO_TX_IN_PROGRESS, 
-                                        TransactionType.SHORT);
-    try {
-      Assert.assertFalse(client.canCommit(txNew, asList(C3, C4)));
-      Assert.fail();
-    } catch (TransactionNotInProgressException e) {
-      // expected
-    }
-    try {
-      Assert.assertFalse(client.commit(txNew));
-      Assert.fail();
-    } catch (TransactionNotInProgressException e) {
-      // expected
-    }
-    // abort of not active tx has no affect
-    client.abort(txNew);
-  }
-
-  @Test
-  public void testAbortAfterCommit() throws Exception {
-    TransactionSystemClient client = getClient();
-    Transaction tx = client.startShort();
-
-    Assert.assertTrue(client.canCommit(tx, asList(C1, C2)));
-    Assert.assertTrue(client.commit(tx));
-    // abort of not active tx has no affect
-    client.abort(tx);
-  }
-
-  // todo add test invalidate method
-  @Test
-  public void testInvalidateTx() throws Exception {
-    TransactionSystemClient client = getClient();
-    // Invalidate an in-progress tx
-    Transaction tx1 = client.startShort();
-    client.canCommit(tx1, asList(C1, C2));
-    Assert.assertTrue(client.invalidate(tx1.getTransactionId()));
-    // Cannot invalidate a committed tx
-    Transaction tx2 = client.startShort();
-    client.canCommit(tx2, asList(C3, C4));
-    client.commit(tx2);
-    Assert.assertFalse(client.invalidate(tx2.getTransactionId()));
-  }
-
-  @Test
-  public void testResetState() throws Exception {
-    // have tx in progress, committing and committed then reset,
-    // get the last snapshot and see that it is empty
-    TransactionSystemClient client = getClient();
-    TransactionStateStorage stateStorage = getStateStorage();
-
-    Transaction tx1 = client.startShort();
-    Transaction tx2 = client.startShort();
-    client.canCommit(tx1, asList(C1, C2));
-    client.commit(tx1);
-    client.canCommit(tx2, asList(C3, C4));
-
-    Transaction txPreReset = client.startShort();
-    long currentTs = System.currentTimeMillis();
-    client.resetState();
-
-    TransactionSnapshot snapshot = stateStorage.getLatestSnapshot();
-    Assert.assertTrue(snapshot.getTimestamp() >= currentTs);
-    Assert.assertEquals(0, snapshot.getInvalid().size());
-    Assert.assertEquals(0, snapshot.getInProgress().size());
-    Assert.assertEquals(0, snapshot.getCommittingChangeSets().size());
-    Assert.assertEquals(0, snapshot.getCommittedChangeSets().size());
-
-    // confirm that transaction IDs are not reset
-    Transaction txPostReset = client.startShort();
-    Assert.assertTrue("New tx ID should be greater than last ID before reset",
-                      txPostReset.getTransactionId() > txPreReset.getTransactionId());
-  }
-  
-  @Test
-  public void testTruncateInvalidTx() throws Exception {
-    // Start few transactions and invalidate all of them
-    TransactionSystemClient client = getClient();
-    Transaction tx1 = client.startLong();
-    Transaction tx2 = client.startShort();
-    Transaction tx3 = client.startLong();
-    
-    client.invalidate(tx1.getTransactionId());
-    client.invalidate(tx2.getTransactionId());
-    client.invalidate(tx3.getTransactionId());
-    
-    // Remove tx2 and tx3 from invalid list
-    Assert.assertTrue(client.truncateInvalidTx(ImmutableSet.of(tx2.getTransactionId(), tx3.getTransactionId())));
-    
-    Transaction tx = client.startShort();
-    // Only tx1 should be in invalid list now
-    Assert.assertArrayEquals(new long[] {tx1.getTransactionId()}, tx.getInvalids());
-    client.abort(tx);
-  }
-
-  @Test
-  public void testTruncateInvalidTxBefore() throws Exception {
-    // Start few transactions
-    TransactionSystemClient client = getClient();
-    Transaction tx1 = client.startLong();
-    Transaction tx2 = client.startShort();
-    // Sleep so that transaction ids get generated a millisecond apart for assertion
-    // TEPHRA-63 should eliminate the need to sleep
-    TimeUnit.MILLISECONDS.sleep(1);
-    long beforeTx3 = System.currentTimeMillis();
-    Transaction tx3 = client.startLong();
-    
-    try {
-      // throws exception since tx1 and tx2 are still in-progress
-      client.truncateInvalidTxBefore(beforeTx3);
-      Assert.fail("Expected InvalidTruncateTimeException exception");
-    } catch (InvalidTruncateTimeException e) {
-      // Expected exception
-    }
-    
-    // Invalidate all of them
-    client.invalidate(tx1.getTransactionId());
-    client.invalidate(tx2.getTransactionId());
-    client.invalidate(tx3.getTransactionId());
-
-    // Remove transactions before time beforeTx3
-    Assert.assertTrue(client.truncateInvalidTxBefore(beforeTx3));
-
-    Transaction tx = client.startShort();
-    // Only tx3 should be in invalid list now
-    Assert.assertArrayEquals(new long[] {tx3.getTransactionId()}, tx.getInvalids());
-    client.abort(tx);
-  }
-
-  @Test
-  public void testGetInvalidSize() throws Exception {
-    // Start few transactions and invalidate all of them
-    TransactionSystemClient client = getClient();
-    Transaction tx1 = client.startLong();
-    Transaction tx2 = client.startShort();
-    Transaction tx3 = client.startLong();
-
-    Assert.assertEquals(0, client.getInvalidSize());
-
-    client.invalidate(tx1.getTransactionId());
-    client.invalidate(tx2.getTransactionId());
-    client.invalidate(tx3.getTransactionId());
-
-    Assert.assertEquals(3, client.getInvalidSize());
-  }
-
-  private Collection<byte[]> asList(byte[]... val) {
-    return Arrays.asList(val);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/TransactionTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/co/cask/tephra/TransactionTest.java b/tephra-core/src/test/java/co/cask/tephra/TransactionTest.java
deleted file mode 100644
index b1c7e43..0000000
--- a/tephra-core/src/test/java/co/cask/tephra/TransactionTest.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra;
-
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.primitives.Longs;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.Set;
-
-/**
- *
- */
-public class TransactionTest {
-  // Current transaction id
-  private final long txId = 200L;
-  // Read pointer for current transaction
-  private final long readPointer = 100L;
-  // Transactions committed before current transaction was started.
-  private final Set<Long> priorCommitted = ImmutableSet.of(80L, 99L, 100L);
-  // Transactions committed after current transaction was started.
-  private final Set<Long> postCommitted = ImmutableSet.of(150L, 180L, 210L);
-  // Invalid transactions before current transaction was started.
-  private final Set<Long> priorInvalids = ImmutableSet.of(90L, 110L, 190L);
-  // Invalid transactions after current transaction was started.
-  private final Set<Long> postInvalids = ImmutableSet.of(201L, 221L, 231L);
-  // Transactions in progress before current transaction was started.
-  private final Set<Long> priorInProgress = ImmutableSet.of(95L, 120L, 150L);
-  // Transactions in progress after current transaction was started.
-  private final Set<Long> postInProgress = ImmutableSet.of(205L, 215L, 300L);
-
-  @Test
-  public void testSnapshotVisibility() throws Exception {
-    Transaction.VisibilityLevel visibilityLevel = Transaction.VisibilityLevel.SNAPSHOT;
-
-    Set<Long> checkPointers = ImmutableSet.of(220L, 250L);
-    Transaction tx = new Transaction(readPointer, txId, 250L, toSortedArray(priorInvalids),
-                                     toSortedArray(priorInProgress), 95L,
-                                     TransactionType.SHORT, toSortedArray(checkPointers),
-                                     visibilityLevel);
-    Set<Long> visibleCurrent = ImmutableSet.of(200L, 220L, 250L);
-    Set<Long> notVisibleCurrent = ImmutableSet.of();
-    assertVisibility(priorCommitted, postCommitted, priorInvalids, postInvalids, priorInProgress, postInProgress,
-                     visibleCurrent, notVisibleCurrent, tx);
-
-    checkPointers = ImmutableSet.of();
-    tx = new Transaction(readPointer, txId, txId, toSortedArray(priorInvalids), toSortedArray(priorInProgress), 95L,
-                                     TransactionType.SHORT, toSortedArray(checkPointers),
-                         visibilityLevel);
-    visibleCurrent = ImmutableSet.of(txId);
-    notVisibleCurrent = ImmutableSet.of();
-    assertVisibility(priorCommitted, postCommitted, priorInvalids, postInvalids, priorInProgress, postInProgress,
-                     visibleCurrent, notVisibleCurrent, tx);
-  }
-
-  @Test
-  public void testSnapshotExcludeVisibility() throws Exception {
-    Transaction.VisibilityLevel visibilityLevel = Transaction.VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT;
-
-    Set<Long> checkPointers = ImmutableSet.of(220L, 250L);
-    Transaction tx = new Transaction(readPointer, txId, 250L, toSortedArray(priorInvalids),
-                                     toSortedArray(priorInProgress), 95L,
-                                     TransactionType.SHORT, toSortedArray(checkPointers),
-                                     visibilityLevel);
-    Set<Long> visibleCurrent = ImmutableSet.of(200L, 220L);
-    Set<Long> notVisibleCurrent = ImmutableSet.of(250L);
-    assertVisibility(priorCommitted, postCommitted, priorInvalids, postInvalids, priorInProgress, postInProgress,
-                     visibleCurrent, notVisibleCurrent, tx);
-
-    checkPointers = ImmutableSet.of();
-    tx = new Transaction(readPointer, txId, txId, toSortedArray(priorInvalids), toSortedArray(priorInProgress), 95L,
-                         TransactionType.SHORT, toSortedArray(checkPointers),
-                         visibilityLevel);
-    visibleCurrent = ImmutableSet.of();
-    notVisibleCurrent = ImmutableSet.of(txId);
-    assertVisibility(priorCommitted, postCommitted, priorInvalids, postInvalids, priorInProgress, postInProgress,
-                     visibleCurrent, notVisibleCurrent, tx);
-  }
-
-  @Test
-  public void testSnapshotAllVisibility() throws Exception {
-    Transaction.VisibilityLevel visibilityLevel = Transaction.VisibilityLevel.SNAPSHOT_ALL;
-
-    Set<Long> checkPointers = ImmutableSet.of(220L, 250L);
-    Transaction tx = new Transaction(readPointer, txId, 250L, toSortedArray(priorInvalids),
-                                     toSortedArray(priorInProgress), 95L,
-                                     TransactionType.SHORT, toSortedArray(checkPointers),
-                                     visibilityLevel);
-    Set<Long> visibleCurrent = ImmutableSet.of(200L, 220L, 250L);
-    Set<Long> notVisibleCurrent = ImmutableSet.of();
-    assertVisibility(priorCommitted, postCommitted, priorInvalids, postInvalids, priorInProgress, postInProgress,
-                     visibleCurrent, notVisibleCurrent, tx);
-
-    checkPointers = ImmutableSet.of();
-    tx = new Transaction(readPointer, txId, txId, toSortedArray(priorInvalids),
-                         toSortedArray(priorInProgress), 95L,
-                         TransactionType.SHORT, toSortedArray(checkPointers),
-                         visibilityLevel);
-    visibleCurrent = ImmutableSet.of(txId);
-    notVisibleCurrent = ImmutableSet.of();
-    assertVisibility(priorCommitted, postCommitted, priorInvalids, postInvalids, priorInProgress, postInProgress,
-                     visibleCurrent, notVisibleCurrent, tx);
-  }
-
-  private void assertVisibility(Set<Long> priorCommitted, Set<Long> postCommitted, Set<Long> priorInvalids,
-                                Set<Long> postInvalids, Set<Long> priorInProgress, Set<Long> postInProgress,
-                                Set<Long> visibleCurrent, Set<Long> notVisibleCurrent,
-                                Transaction tx) {
-    // Verify visible snapshots of tx are visible
-    for (long t : visibleCurrent) {
-      Assert.assertTrue("Assertion error for version = " + t, tx.isVisible(t));
-    }
-
-    // Verify not visible snapshots of tx are not visible
-    for (long t : notVisibleCurrent) {
-      Assert.assertFalse("Assertion error for version = " + t, tx.isVisible(t));
-    }
-
-    // Verify prior committed versions are visible
-    for (long t : priorCommitted) {
-      Assert.assertTrue("Assertion error for version = " + t, tx.isVisible(t));
-    }
-
-    // Verify versions committed after tx started, and not part of tx are not visible
-    for (long t : postCommitted) {
-      Assert.assertFalse("Assertion error for version = " + t, tx.isVisible(t));
-    }
-
-    // Verify invalid and in-progress versions are not visible
-    for (long t : Iterables.concat(priorInvalids, postInvalids, priorInProgress, postInProgress)) {
-      Assert.assertFalse("Assertion error for version = " + t, tx.isVisible(t));
-    }
-  }
-
-  private long[] toSortedArray(Set<Long> set) {
-    long[] array = Longs.toArray(set);
-    Arrays.sort(array);
-    return array;
-  }
-}