You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by po...@apache.org on 2016/05/11 20:15:27 UTC

[27/56] [abbrv] [partial] incubator-tephra git commit: Rename package to org.apache.tephra

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/TransactionExecutorTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/TransactionExecutorTest.java b/tephra-core/src/test/java/org/apache/tephra/TransactionExecutorTest.java
new file mode 100644
index 0000000..28ccc6e
--- /dev/null
+++ b/tephra-core/src/test/java/org/apache/tephra/TransactionExecutorTest.java
@@ -0,0 +1,590 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import com.google.inject.Singleton;
+import com.google.inject.util.Modules;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tephra.inmemory.InMemoryTxSystemClient;
+import org.apache.tephra.runtime.ConfigModule;
+import org.apache.tephra.runtime.DiscoveryModules;
+import org.apache.tephra.runtime.TransactionModules;
+import org.apache.tephra.snapshot.DefaultSnapshotCodec;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import javax.annotation.Nullable;
+
+/**
+ * Tests the transaction executor.
+ */
+public class TransactionExecutorTest {
+  private static DummyTxClient txClient;
+  private static TransactionExecutorFactory factory;
+
+  @ClassRule
+  public static TemporaryFolder tmpFolder = new TemporaryFolder();
+
+  @BeforeClass
+  public static void setup() throws IOException {
+    final Configuration conf = new Configuration();
+    conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, DefaultSnapshotCodec.class.getName());
+    conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath());
+    Injector injector = Guice.createInjector(
+      new ConfigModule(conf),
+      new DiscoveryModules().getInMemoryModules(),
+      Modules.override(
+        new TransactionModules().getInMemoryModules()).with(new AbstractModule() {
+        @Override
+        protected void configure() {
+          TransactionManager txManager = new TransactionManager(conf);
+          txManager.startAndWait();
+          bind(TransactionManager.class).toInstance(txManager);
+          bind(TransactionSystemClient.class).to(DummyTxClient.class).in(Singleton.class);
+        }
+      }));
+
+    txClient = (DummyTxClient) injector.getInstance(TransactionSystemClient.class);
+    factory = injector.getInstance(TransactionExecutorFactory.class);
+  }
+
+  final DummyTxAware ds1 = new DummyTxAware(), ds2 = new DummyTxAware();
+  final Collection<TransactionAware> txAwares = ImmutableList.<TransactionAware>of(ds1, ds2);
+
+  private TransactionExecutor getExecutor() {
+    return factory.createExecutor(txAwares);
+  }
+
+  private TransactionExecutor getExecutorWithNoRetry() {
+    return new DefaultTransactionExecutor(txClient, txAwares, RetryStrategies.noRetries());
+  }
+
+  static final byte[] A = { 'a' };
+  static final byte[] B = { 'b' };
+
+  final TransactionExecutor.Function<Integer, Integer> testFunction =
+    new TransactionExecutor.Function<Integer, Integer>() {
+      @Override
+      public Integer apply(@Nullable Integer input) {
+        ds1.addChange(A);
+        ds2.addChange(B);
+        if (input == null) {
+          throw new RuntimeException("function failed");
+        }
+        return input * input;
+      }
+  };
+
+  @Before
+  public void resetTxAwares() {
+    ds1.reset();
+    ds2.reset();
+  }
+
+  @Test
+  public void testSuccessful() throws TransactionFailureException, InterruptedException {
+    // execute: add a change to ds1 and ds2
+    Integer result = getExecutor().execute(testFunction, 10);
+    // verify both are committed and post-committed
+    Assert.assertTrue(ds1.started);
+    Assert.assertTrue(ds2.started);
+    Assert.assertTrue(ds1.checked);
+    Assert.assertTrue(ds2.checked);
+    Assert.assertTrue(ds1.committed);
+    Assert.assertTrue(ds2.committed);
+    Assert.assertTrue(ds1.postCommitted);
+    Assert.assertTrue(ds2.postCommitted);
+    Assert.assertFalse(ds1.rolledBack);
+    Assert.assertFalse(ds2.rolledBack);
+    Assert.assertTrue(100 == result);
+    Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Committed);
+  }
+
+  @Test
+  public void testPostCommitFailure() throws TransactionFailureException, InterruptedException {
+    ds1.failPostCommitTxOnce = InduceFailure.ThrowException;
+    // execute: add a change to ds1 and ds2
+    try {
+      getExecutor().execute(testFunction, 10);
+      Assert.fail("post commit failed - exception should be thrown");
+    } catch (TransactionFailureException e) {
+      Assert.assertEquals("post failure", e.getCause().getMessage());
+    }
+    // verify both are committed and post-committed
+    Assert.assertTrue(ds1.started);
+    Assert.assertTrue(ds2.started);
+    Assert.assertTrue(ds1.checked);
+    Assert.assertTrue(ds2.checked);
+    Assert.assertTrue(ds1.committed);
+    Assert.assertTrue(ds2.committed);
+    Assert.assertTrue(ds1.postCommitted);
+    Assert.assertTrue(ds2.postCommitted);
+    Assert.assertFalse(ds1.rolledBack);
+    Assert.assertFalse(ds2.rolledBack);
+    Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Committed);
+  }
+
+  @Test
+  public void testPersistFailure() throws TransactionFailureException, InterruptedException {
+    ds1.failCommitTxOnce = InduceFailure.ThrowException;
+    // execute: add a change to ds1 and ds2
+    try {
+      getExecutor().execute(testFunction, 10);
+      Assert.fail("persist failed - exception should be thrown");
+    } catch (TransactionFailureException e) {
+      Assert.assertEquals("persist failure", e.getCause().getMessage());
+    }
+    // verify both are rolled back and tx is aborted
+    Assert.assertTrue(ds1.started);
+    Assert.assertTrue(ds2.started);
+    Assert.assertTrue(ds1.checked);
+    Assert.assertTrue(ds2.checked);
+    Assert.assertTrue(ds1.committed);
+    Assert.assertFalse(ds2.committed);
+    Assert.assertFalse(ds1.postCommitted);
+    Assert.assertFalse(ds2.postCommitted);
+    Assert.assertTrue(ds1.rolledBack);
+    Assert.assertTrue(ds2.rolledBack);
+    Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted);
+  }
+
+  @Test
+  public void testPersistFalse() throws TransactionFailureException, InterruptedException {
+    ds1.failCommitTxOnce = InduceFailure.ReturnFalse;
+    // execute: add a change to ds1 and ds2
+    try {
+      getExecutor().execute(testFunction, 10);
+      Assert.fail("persist failed - exception should be thrown");
+    } catch (TransactionFailureException e) {
+      Assert.assertNull(e.getCause()); // in this case, the ds simply returned false
+    }
+    // verify both are rolled back and tx is aborted
+    Assert.assertTrue(ds1.started);
+    Assert.assertTrue(ds2.started);
+    Assert.assertTrue(ds1.checked);
+    Assert.assertTrue(ds2.checked);
+    Assert.assertTrue(ds1.committed);
+    Assert.assertFalse(ds2.committed);
+    Assert.assertFalse(ds1.postCommitted);
+    Assert.assertFalse(ds2.postCommitted);
+    Assert.assertTrue(ds1.rolledBack);
+    Assert.assertTrue(ds2.rolledBack);
+    Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted);
+  }
+
+  @Test
+  public void testPersistAndRollbackFailure() throws TransactionFailureException, InterruptedException {
+    ds1.failCommitTxOnce = InduceFailure.ThrowException;
+    ds1.failRollbackTxOnce = InduceFailure.ThrowException;
+    // execute: add a change to ds1 and ds2
+    try {
+      getExecutor().execute(testFunction, 10);
+      Assert.fail("persist failed - exception should be thrown");
+    } catch (TransactionFailureException e) {
+      Assert.assertEquals("persist failure", e.getCause().getMessage());
+    }
+    // verify both are rolled back and tx is invalidated
+    Assert.assertTrue(ds1.started);
+    Assert.assertTrue(ds2.started);
+    Assert.assertTrue(ds1.checked);
+    Assert.assertTrue(ds2.checked);
+    Assert.assertTrue(ds1.committed);
+    Assert.assertFalse(ds2.committed);
+    Assert.assertFalse(ds1.postCommitted);
+    Assert.assertFalse(ds2.postCommitted);
+    Assert.assertTrue(ds1.rolledBack);
+    Assert.assertTrue(ds2.rolledBack);
+    Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Invalidated);
+  }
+
+  @Test
+  public void testPersistAndRollbackFalse() throws TransactionFailureException, InterruptedException {
+    ds1.failCommitTxOnce = InduceFailure.ReturnFalse;
+    ds1.failRollbackTxOnce = InduceFailure.ReturnFalse;
+    // execute: add a change to ds1 and ds2
+    try {
+      getExecutor().execute(testFunction, 10);
+      Assert.fail("persist failed - exception should be thrown");
+    } catch (TransactionFailureException e) {
+      Assert.assertNull(e.getCause()); // in this case, the ds simply returned false
+    }
+    // verify both are rolled back and tx is invalidated
+    Assert.assertTrue(ds1.started);
+    Assert.assertTrue(ds2.started);
+    Assert.assertTrue(ds1.checked);
+    Assert.assertTrue(ds2.checked);
+    Assert.assertTrue(ds1.committed);
+    Assert.assertFalse(ds2.committed);
+    Assert.assertFalse(ds1.postCommitted);
+    Assert.assertFalse(ds2.postCommitted);
+    Assert.assertTrue(ds1.rolledBack);
+    Assert.assertTrue(ds2.rolledBack);
+    Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Invalidated);
+  }
+
+  @Test
+  public void testNoIndefiniteRetryByDefault() throws TransactionFailureException, InterruptedException {
+    // we want retry by default, so that engineers don't miss it
+    txClient.failCommits = 1000;
+    try {
+      // execute: add a change to ds1 and ds2
+      getExecutor().execute(testFunction, 10);
+      Assert.fail("commit failed too many times to retry - exception should be thrown");
+    } catch (TransactionConflictException e) {
+      Assert.assertNull(e.getCause());
+    }
+
+    txClient.failCommits = 0;
+
+    // verify both are rolled back and tx is aborted
+    Assert.assertTrue(ds1.started);
+    Assert.assertTrue(ds2.started);
+    Assert.assertTrue(ds1.checked);
+    Assert.assertTrue(ds2.checked);
+    Assert.assertTrue(ds1.committed);
+    Assert.assertTrue(ds2.committed);
+    Assert.assertFalse(ds1.postCommitted);
+    Assert.assertFalse(ds2.postCommitted);
+    Assert.assertTrue(ds1.rolledBack);
+    Assert.assertTrue(ds2.rolledBack);
+    Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted);
+  }
+
+  @Test
+  public void testRetryByDefault() throws TransactionFailureException, InterruptedException {
+    // we want retry by default, so that engineers don't miss it
+    txClient.failCommits = 2;
+    // execute: add a change to ds1 and ds2
+    getExecutor().execute(testFunction, 10);
+    // should not fail, but continue
+
+    // verify both are committed
+    Assert.assertTrue(ds1.started);
+    Assert.assertTrue(ds2.started);
+    Assert.assertTrue(ds1.checked);
+    Assert.assertTrue(ds2.checked);
+    Assert.assertTrue(ds1.committed);
+    Assert.assertTrue(ds2.committed);
+    Assert.assertTrue(ds1.postCommitted);
+    Assert.assertTrue(ds2.postCommitted);
+    Assert.assertFalse(ds1.rolledBack);
+    Assert.assertFalse(ds2.rolledBack);
+    Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Committed);
+  }
+
+  @Test
+  public void testCommitFalse() throws TransactionFailureException, InterruptedException {
+    txClient.failCommits = 1;
+    // execute: add a change to ds1 and ds2
+    try {
+      getExecutorWithNoRetry().execute(testFunction, 10);
+      Assert.fail("commit failed - exception should be thrown");
+    } catch (TransactionConflictException e) {
+      Assert.assertNull(e.getCause());
+    }
+    // verify both are rolled back and tx is aborted
+    Assert.assertTrue(ds1.started);
+    Assert.assertTrue(ds2.started);
+    Assert.assertTrue(ds1.checked);
+    Assert.assertTrue(ds2.checked);
+    Assert.assertTrue(ds1.committed);
+    Assert.assertTrue(ds2.committed);
+    Assert.assertFalse(ds1.postCommitted);
+    Assert.assertFalse(ds2.postCommitted);
+    Assert.assertTrue(ds1.rolledBack);
+    Assert.assertTrue(ds2.rolledBack);
+    Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted);
+  }
+
+  @Test
+  public void testCanCommitFalse() throws TransactionFailureException, InterruptedException {
+    txClient.failCanCommitOnce = true;
+    // execute: add a change to ds1 and ds2
+    try {
+      getExecutorWithNoRetry().execute(testFunction, 10);
+      Assert.fail("commit failed - exception should be thrown");
+    } catch (TransactionConflictException e) {
+      Assert.assertNull(e.getCause());
+    }
+    // verify both are rolled back and tx is aborted
+    Assert.assertTrue(ds1.started);
+    Assert.assertTrue(ds2.started);
+    Assert.assertTrue(ds1.checked);
+    Assert.assertTrue(ds2.checked);
+    Assert.assertFalse(ds1.committed);
+    Assert.assertFalse(ds2.committed);
+    Assert.assertFalse(ds1.postCommitted);
+    Assert.assertFalse(ds2.postCommitted);
+    Assert.assertTrue(ds1.rolledBack);
+    Assert.assertTrue(ds2.rolledBack);
+    Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted);
+  }
+
+  @Test
+  public void testChangesAndRollbackFailure() throws TransactionFailureException, InterruptedException {
+    ds1.failChangesTxOnce = InduceFailure.ThrowException;
+    ds1.failRollbackTxOnce = InduceFailure.ThrowException;
+    // execute: add a change to ds1 and ds2
+    try {
+      getExecutor().execute(testFunction, 10);
+      Assert.fail("get changes failed - exception should be thrown");
+    } catch (TransactionFailureException e) {
+      Assert.assertEquals("changes failure", e.getCause().getMessage());
+    }
+    // verify both are rolled back and tx is invalidated
+    Assert.assertTrue(ds1.started);
+    Assert.assertTrue(ds2.started);
+    Assert.assertTrue(ds1.checked);
+    Assert.assertFalse(ds2.checked);
+    Assert.assertFalse(ds1.committed);
+    Assert.assertFalse(ds2.committed);
+    Assert.assertFalse(ds1.postCommitted);
+    Assert.assertFalse(ds2.postCommitted);
+    Assert.assertTrue(ds1.rolledBack);
+    Assert.assertTrue(ds2.rolledBack);
+    Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Invalidated);
+  }
+
+  @Test
+  public void testFunctionAndRollbackFailure() throws TransactionFailureException, InterruptedException {
+    ds1.failRollbackTxOnce = InduceFailure.ReturnFalse;
+    // execute: add a change to ds1 and ds2
+    try {
+      getExecutor().execute(testFunction, null);
+      Assert.fail("function failed - exception should be thrown");
+    } catch (TransactionFailureException e) {
+      Assert.assertEquals("function failed", e.getCause().getMessage());
+    }
+    // verify both are rolled back and tx is invalidated
+    Assert.assertTrue(ds1.started);
+    Assert.assertTrue(ds2.started);
+    Assert.assertFalse(ds1.checked);
+    Assert.assertFalse(ds2.checked);
+    Assert.assertFalse(ds1.committed);
+    Assert.assertFalse(ds2.committed);
+    Assert.assertFalse(ds1.postCommitted);
+    Assert.assertFalse(ds2.postCommitted);
+    Assert.assertTrue(ds1.rolledBack);
+    Assert.assertTrue(ds2.rolledBack);
+    Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Invalidated);
+  }
+
+  @Test
+  public void testStartAndRollbackFailure() throws TransactionFailureException, InterruptedException {
+    ds1.failStartTxOnce = InduceFailure.ThrowException;
+    // execute: add a change to ds1 and ds2
+    try {
+      getExecutor().execute(testFunction, 10);
+      Assert.fail("start failed - exception should be thrown");
+    } catch (TransactionFailureException e) {
+      Assert.assertEquals("start failure", e.getCause().getMessage());
+    }
+    // verify both are not rolled back and tx is aborted
+    Assert.assertTrue(ds1.started);
+    Assert.assertFalse(ds2.started);
+    Assert.assertFalse(ds1.checked);
+    Assert.assertFalse(ds2.checked);
+    Assert.assertFalse(ds1.committed);
+    Assert.assertFalse(ds2.committed);
+    Assert.assertFalse(ds1.postCommitted);
+    Assert.assertFalse(ds2.postCommitted);
+    Assert.assertFalse(ds1.rolledBack);
+    Assert.assertFalse(ds2.rolledBack);
+    Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted);
+  }
+
+  enum InduceFailure { NoFailure, ReturnFalse, ThrowException }
+
+  static class DummyTxAware implements TransactionAware {
+
+    Transaction tx;
+    boolean started = false;
+    boolean committed = false;
+    boolean checked = false;
+    boolean rolledBack = false;
+    boolean postCommitted = false;
+    List<byte[]> changes = Lists.newArrayList();
+
+    InduceFailure failStartTxOnce = InduceFailure.NoFailure;
+    InduceFailure failChangesTxOnce = InduceFailure.NoFailure;
+    InduceFailure failCommitTxOnce = InduceFailure.NoFailure;
+    InduceFailure failPostCommitTxOnce = InduceFailure.NoFailure;
+    InduceFailure failRollbackTxOnce = InduceFailure.NoFailure;
+
+    void addChange(byte[] key) {
+      changes.add(key);
+    }
+
+    void reset() {
+      tx = null;
+      started = false;
+      checked = false;
+      committed = false;
+      rolledBack = false;
+      postCommitted = false;
+      changes.clear();
+    }
+
+    @Override
+    public void startTx(Transaction tx) {
+      reset();
+      started = true;
+      this.tx = tx;
+      if (failStartTxOnce == InduceFailure.ThrowException) {
+        failStartTxOnce = InduceFailure.NoFailure;
+        throw new RuntimeException("start failure");
+      }
+    }
+
+    @Override
+    public void updateTx(Transaction tx) {
+      this.tx = tx;
+    }
+
+    @Override
+    public Collection<byte[]> getTxChanges() {
+      checked = true;
+      if (failChangesTxOnce == InduceFailure.ThrowException) {
+        failChangesTxOnce = InduceFailure.NoFailure;
+        throw new RuntimeException("changes failure");
+      }
+      return ImmutableList.copyOf(changes);
+    }
+
+    @Override
+    public boolean commitTx() throws Exception {
+      committed = true;
+      if (failCommitTxOnce == InduceFailure.ThrowException) {
+        failCommitTxOnce = InduceFailure.NoFailure;
+        throw new RuntimeException("persist failure");
+      }
+      if (failCommitTxOnce == InduceFailure.ReturnFalse) {
+        failCommitTxOnce = InduceFailure.NoFailure;
+        return false;
+      }
+      return true;
+    }
+
+    @Override
+    public void postTxCommit() {
+      postCommitted = true;
+      if (failPostCommitTxOnce == InduceFailure.ThrowException) {
+        failPostCommitTxOnce = InduceFailure.NoFailure;
+        throw new RuntimeException("post failure");
+      }
+    }
+
+    @Override
+    public boolean rollbackTx() throws Exception {
+      rolledBack = true;
+      if (failRollbackTxOnce == InduceFailure.ThrowException) {
+        failRollbackTxOnce = InduceFailure.NoFailure;
+        throw new RuntimeException("rollback failure");
+      }
+      if (failRollbackTxOnce == InduceFailure.ReturnFalse) {
+        failRollbackTxOnce = InduceFailure.NoFailure;
+        return false;
+      }
+      return true;
+    }
+
+    @Override
+    public String getTransactionAwareName() {
+      return "dummy";
+    }
+  }
+
+  static class DummyTxClient extends InMemoryTxSystemClient {
+
+    boolean failCanCommitOnce = false;
+    int failCommits = 0;
+    enum CommitState {
+      Started, Committed, Aborted, Invalidated
+    }
+    CommitState state = CommitState.Started;
+
+    @Inject
+    DummyTxClient(TransactionManager txmgr) {
+      super(txmgr);
+    }
+
+    @Override
+    public boolean canCommit(Transaction tx, Collection<byte[]> changeIds) throws TransactionNotInProgressException {
+      if (failCanCommitOnce) {
+        failCanCommitOnce = false;
+        return false;
+      } else {
+        return super.canCommit(tx, changeIds);
+      }
+    }
+
+    @Override
+    public boolean commit(Transaction tx) throws TransactionNotInProgressException {
+      if (failCommits-- > 0) {
+        return false;
+      } else {
+        state = CommitState.Committed;
+        return super.commit(tx);
+      }
+    }
+
+    @Override
+    public Transaction startLong() {
+      state = CommitState.Started;
+      return super.startLong();
+    }
+
+    @Override
+    public Transaction startShort() {
+      state = CommitState.Started;
+      return super.startShort();
+    }
+
+    @Override
+    public Transaction startShort(int timeout) {
+      state = CommitState.Started;
+      return super.startShort(timeout);
+    }
+
+    @Override
+    public void abort(Transaction tx) {
+      state = CommitState.Aborted;
+      super.abort(tx);
+    }
+
+    @Override
+    public boolean invalidate(long tx) {
+      state = CommitState.Invalidated;
+      return super.invalidate(tx);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/TransactionManagerTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/TransactionManagerTest.java b/tephra-core/src/test/java/org/apache/tephra/TransactionManagerTest.java
new file mode 100644
index 0000000..f74e209
--- /dev/null
+++ b/tephra-core/src/test/java/org/apache/tephra/TransactionManagerTest.java
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tephra.inmemory.InMemoryTxSystemClient;
+import org.apache.tephra.metrics.TxMetricsCollector;
+import org.apache.tephra.persist.InMemoryTransactionStateStorage;
+import org.apache.tephra.persist.TransactionStateStorage;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ */
+public class TransactionManagerTest extends TransactionSystemTest {
+
+  static Configuration conf = new Configuration();
+
+  TransactionManager txManager = null;
+  TransactionStateStorage txStateStorage = null;
+
+  @Override
+  protected TransactionSystemClient getClient() {
+    return new InMemoryTxSystemClient(txManager);
+  }
+
+  @Override
+  protected TransactionStateStorage getStateStorage() {
+    return txStateStorage;
+  }
+
+  @Before
+  public void before() {
+    conf.setInt(TxConstants.Manager.CFG_TX_CLEANUP_INTERVAL, 0); // no cleanup thread
+    // todo should create two sets of tests, one with LocalFileTxStateStorage and one with InMemoryTxStateStorage
+    txStateStorage = new InMemoryTransactionStateStorage();
+    txManager = new TransactionManager
+      (conf, txStateStorage, new TxMetricsCollector());
+    txManager.startAndWait();
+  }
+
+  @After
+  public void after() {
+    txManager.stopAndWait();
+  }
+
+  @Test
+  public void testTransactionCleanup() throws Exception {
+    conf.setInt(TxConstants.Manager.CFG_TX_CLEANUP_INTERVAL, 3);
+    conf.setInt(TxConstants.Manager.CFG_TX_TIMEOUT, 2);
+    // using a new tx manager that cleans up
+    TransactionManager txm = new TransactionManager
+      (conf, new InMemoryTransactionStateStorage(), new TxMetricsCollector());
+    txm.startAndWait();
+    try {
+      Assert.assertEquals(0, txm.getInvalidSize());
+      Assert.assertEquals(0, txm.getCommittedSize());
+      // start a transaction and leave it open
+      Transaction tx1 = txm.startShort();
+      // start a long running transaction and leave it open
+      Transaction tx2 = txm.startLong();
+      Transaction tx3 = txm.startLong();
+      // start and commit a bunch of transactions
+      for (int i = 0; i < 10; i++) {
+        Transaction tx = txm.startShort();
+        Assert.assertTrue(txm.canCommit(tx, Collections.singleton(new byte[] { (byte) i })));
+        Assert.assertTrue(txm.commit(tx));
+      }
+      // all of these should still be in the committed set
+      Assert.assertEquals(0, txm.getInvalidSize());
+      Assert.assertEquals(10, txm.getCommittedSize());
+      // sleep longer than the cleanup interval
+      TimeUnit.SECONDS.sleep(5);
+      // transaction should now be invalid
+      Assert.assertEquals(1, txm.getInvalidSize());
+      // run another transaction
+      Transaction txx = txm.startShort();
+      // verify the exclude
+      Assert.assertFalse(txx.isVisible(tx1.getTransactionId()));
+      Assert.assertFalse(txx.isVisible(tx2.getTransactionId()));
+      Assert.assertFalse(txx.isVisible(tx3.getTransactionId()));
+      // try to commit the last transaction that was started
+      Assert.assertTrue(txm.canCommit(txx, Collections.singleton(new byte[] { 0x0a })));
+      Assert.assertTrue(txm.commit(txx));
+
+      // now the committed change sets should be empty again
+      Assert.assertEquals(0, txm.getCommittedSize());
+      // cannot commit transaction as it was timed out
+      try {
+        txm.canCommit(tx1, Collections.singleton(new byte[] { 0x11 }));
+        Assert.fail();
+      } catch (TransactionNotInProgressException e) {
+        // expected
+      }
+      txm.abort(tx1);
+      // abort should have removed from invalid
+      Assert.assertEquals(0, txm.getInvalidSize());
+      // run another bunch of transactions
+      for (int i = 0; i < 10; i++) {
+        Transaction tx = txm.startShort();
+        Assert.assertTrue(txm.canCommit(tx, Collections.singleton(new byte[] { (byte) i })));
+        Assert.assertTrue(txm.commit(tx));
+      }
+      // none of these should still be in the committed set (tx2 is long-running).
+      Assert.assertEquals(0, txm.getInvalidSize());
+      Assert.assertEquals(0, txm.getCommittedSize());
+      // commit tx2, abort tx3
+      Assert.assertTrue(txm.commit(tx2));
+      txm.abort(tx3);
+      // none of these should still be in the committed set (tx2 is long-running).
+      // Only tx3 is invalid list as it was aborted and is long-running. tx1 is short one and it rolled back its changes
+      // so it should NOT be in invalid list
+      Assert.assertEquals(1, txm.getInvalidSize());
+      Assert.assertEquals(tx3.getTransactionId(), (long) txm.getCurrentState().getInvalid().iterator().next());
+      Assert.assertEquals(1, txm.getExcludedListSize());
+    } finally {
+      txm.stopAndWait();
+    }
+  }
+
+  @Test
+  public void testLongTransactionCleanup() throws Exception {
+    conf.setInt(TxConstants.Manager.CFG_TX_CLEANUP_INTERVAL, 3);
+    conf.setInt(TxConstants.Manager.CFG_TX_LONG_TIMEOUT, 2);
+    // using a new tx manager that cleans up
+    TransactionManager txm = new TransactionManager
+      (conf, new InMemoryTransactionStateStorage(), new TxMetricsCollector());
+    txm.startAndWait();
+    try {
+      Assert.assertEquals(0, txm.getInvalidSize());
+      Assert.assertEquals(0, txm.getCommittedSize());
+      
+      // start a long running transaction
+      Transaction tx1 = txm.startLong();
+      
+      Assert.assertEquals(0, txm.getInvalidSize());
+      Assert.assertEquals(0, txm.getCommittedSize());
+
+      // sleep longer than the cleanup interval
+      TimeUnit.SECONDS.sleep(5);
+
+      // transaction should now be invalid
+      Assert.assertEquals(1, txm.getInvalidSize());
+      Assert.assertEquals(0, txm.getCommittedSize());
+
+      // cannot commit transaction as it was timed out
+      try {
+        txm.canCommit(tx1, Collections.singleton(new byte[] { 0x11 }));
+        Assert.fail();
+      } catch (TransactionNotInProgressException e) {
+        // expected
+      }
+      
+      txm.abort(tx1);
+      // abort should not remove long running transaction from invalid list
+      Assert.assertEquals(1, txm.getInvalidSize());
+    } finally {
+      txm.stopAndWait();
+    }
+  }
+  
+  @Test
+  public void testTruncateInvalid() throws Exception {
+    InMemoryTransactionStateStorage storage = new InMemoryTransactionStateStorage();
+    Configuration testConf = new Configuration(conf);
+    // No snapshots
+    testConf.setLong(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, -1);
+    TransactionManager txm1 = new TransactionManager(testConf, storage, new TxMetricsCollector());
+    txm1.startAndWait();
+
+    TransactionManager txm2 = null;
+    Transaction tx1;
+    Transaction tx2;
+    Transaction tx3;
+    Transaction tx4;
+    Transaction tx5;
+    Transaction tx6;
+    try {
+      Assert.assertEquals(0, txm1.getInvalidSize());
+
+      // start a few transactions
+      tx1 = txm1.startLong();
+      tx2 = txm1.startShort();
+      tx3 = txm1.startLong();
+      tx4 = txm1.startShort();
+      tx5 = txm1.startLong();
+      tx6 = txm1.startShort();
+
+      // invalidate tx1, tx2, tx5 and tx6
+      txm1.invalidate(tx1.getTransactionId());
+      txm1.invalidate(tx2.getTransactionId());
+      txm1.invalidate(tx5.getTransactionId());
+      txm1.invalidate(tx6.getTransactionId());
+
+      // tx1, tx2, tx5 and tx6 should be in invalid list
+      Assert.assertEquals(
+        ImmutableList.of(tx1.getTransactionId(), tx2.getTransactionId(), tx5.getTransactionId(),
+            tx6.getTransactionId()),
+        txm1.getCurrentState().getInvalid()
+      );
+      
+      // remove tx1 and tx6 from invalid list
+      Assert.assertTrue(txm1.truncateInvalidTx(ImmutableSet.of(tx1.getTransactionId(), tx6.getTransactionId())));
+      
+      // only tx2 and tx5 should be in invalid list now
+      Assert.assertEquals(ImmutableList.of(tx2.getTransactionId(), tx5.getTransactionId()),
+                          txm1.getCurrentState().getInvalid());
+      
+      // removing in-progress transactions should not have any effect
+      Assert.assertEquals(ImmutableSet.of(tx3.getTransactionId(), tx4.getTransactionId()),
+                          txm1.getCurrentState().getInProgress().keySet());
+      Assert.assertFalse(txm1.truncateInvalidTx(ImmutableSet.of(tx3.getTransactionId(), tx4.getTransactionId())));
+      // no change to in-progress
+      Assert.assertEquals(ImmutableSet.of(tx3.getTransactionId(), tx4.getTransactionId()),
+                          txm1.getCurrentState().getInProgress().keySet());
+      // no change to invalid list
+      Assert.assertEquals(ImmutableList.of(tx2.getTransactionId(), tx5.getTransactionId()),
+                          txm1.getCurrentState().getInvalid());
+
+      // Test transaction edit logs replay
+      // Start another transaction manager without stopping txm1 so that snapshot does not get written,
+      // and all logs can be replayed.
+      txm2 = new TransactionManager(testConf, storage, new TxMetricsCollector());
+      txm2.startAndWait();
+      Assert.assertEquals(ImmutableList.of(tx2.getTransactionId(), tx5.getTransactionId()),
+                          txm2.getCurrentState().getInvalid());
+      Assert.assertEquals(ImmutableSet.of(tx3.getTransactionId(), tx4.getTransactionId()),
+                          txm2.getCurrentState().getInProgress().keySet());
+    } finally {
+      txm1.stopAndWait();
+      if (txm2 != null) {
+        txm2.stopAndWait();
+      }
+    }
+  }
+
+  @Test
+  public void testTruncateInvalidBeforeTime() throws Exception {
+    InMemoryTransactionStateStorage storage = new InMemoryTransactionStateStorage();
+    Configuration testConf = new Configuration(conf);
+    // No snapshots
+    testConf.setLong(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, -1);
+    TransactionManager txm1 = new TransactionManager(testConf, storage, new TxMetricsCollector());
+    txm1.startAndWait();
+
+    TransactionManager txm2 = null;
+    Transaction tx1;
+    Transaction tx2;
+    Transaction tx3;
+    Transaction tx4;
+    Transaction tx5;
+    Transaction tx6;
+    try {
+      Assert.assertEquals(0, txm1.getInvalidSize());
+
+      // start a few transactions
+      tx1 = txm1.startLong();
+      tx2 = txm1.startShort();
+      // Sleep so that transaction ids get generated a millisecond apart for assertion
+      // TEPHRA-63 should eliminate the need to sleep
+      TimeUnit.MILLISECONDS.sleep(1);
+      long timeBeforeTx3 = System.currentTimeMillis();
+      tx3 = txm1.startLong();
+      tx4 = txm1.startShort();
+      TimeUnit.MILLISECONDS.sleep(1);
+      long timeBeforeTx5 = System.currentTimeMillis();
+      tx5 = txm1.startLong();
+      tx6 = txm1.startShort();
+
+      // invalidate tx1, tx2, tx5 and tx6
+      txm1.invalidate(tx1.getTransactionId());
+      txm1.invalidate(tx2.getTransactionId());
+      txm1.invalidate(tx5.getTransactionId());
+      txm1.invalidate(tx6.getTransactionId());
+
+      // tx1, tx2, tx5 and tx6 should be in invalid list
+      Assert.assertEquals(
+        ImmutableList.of(tx1.getTransactionId(), tx2.getTransactionId(), tx5.getTransactionId(),
+            tx6.getTransactionId()),
+        txm1.getCurrentState().getInvalid()
+      );
+
+      // remove transactions before tx3 from invalid list
+      Assert.assertTrue(txm1.truncateInvalidTxBefore(timeBeforeTx3));
+
+      // only tx5 and tx6 should be in invalid list now
+      Assert.assertEquals(ImmutableList.of(tx5.getTransactionId(), tx6.getTransactionId()),
+                          txm1.getCurrentState().getInvalid());
+
+      // removing invalid transactions before tx5 should throw exception since tx3 and tx4 are in-progress
+      Assert.assertEquals(ImmutableSet.of(tx3.getTransactionId(), tx4.getTransactionId()),
+                          txm1.getCurrentState().getInProgress().keySet());
+      try {
+        txm1.truncateInvalidTxBefore(timeBeforeTx5);
+        Assert.fail("Expected InvalidTruncateTimeException exception");
+      } catch (InvalidTruncateTimeException e) {
+        // Expected exception
+      }
+      // no change to in-progress
+      Assert.assertEquals(ImmutableSet.of(tx3.getTransactionId(), tx4.getTransactionId()),
+                          txm1.getCurrentState().getInProgress().keySet());
+      // no change to invalid list
+      Assert.assertEquals(ImmutableList.of(tx5.getTransactionId(), tx6.getTransactionId()),
+                          txm1.getCurrentState().getInvalid());
+
+      // Test transaction edit logs replay
+      // Start another transaction manager without stopping txm1 so that snapshot does not get written, 
+      // and all logs can be replayed.
+      txm2 = new TransactionManager(testConf, storage, new TxMetricsCollector());
+      txm2.startAndWait();
+      Assert.assertEquals(ImmutableList.of(tx5.getTransactionId(), tx6.getTransactionId()),
+                          txm2.getCurrentState().getInvalid());
+      Assert.assertEquals(ImmutableSet.of(tx3.getTransactionId(), tx4.getTransactionId()),
+                          txm2.getCurrentState().getInProgress().keySet());
+    } finally {
+      txm1.stopAndWait();
+      if (txm2 != null) {
+        txm2.stopAndWait();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/TransactionServiceMainTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/TransactionServiceMainTest.java b/tephra-core/src/test/java/org/apache/tephra/TransactionServiceMainTest.java
new file mode 100644
index 0000000..fa12acb
--- /dev/null
+++ b/tephra-core/src/test/java/org/apache/tephra/TransactionServiceMainTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra;
+
+import com.google.common.base.Throwables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tephra.distributed.TransactionServiceClient;
+import org.apache.twill.internal.zookeeper.InMemoryZKServer;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * Test for verifying TransactionServiceMain works correctly.
+ */
+public class TransactionServiceMainTest {
+
+  @ClassRule
+  public static TemporaryFolder tmpFolder = new TemporaryFolder();
+
+  @Test
+  public void testClientServer() throws Exception {
+    // Simply start a transaction server and connect to it with the client.
+    InMemoryZKServer zkServer = InMemoryZKServer.builder().setDataDir(tmpFolder.newFolder()).build();
+    zkServer.startAndWait();
+
+    try {
+      Configuration conf = new Configuration();
+      conf.set(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM, zkServer.getConnectionStr());
+      conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath());
+
+      final TransactionServiceMain main = new TransactionServiceMain(conf);
+      final CountDownLatch latch = new CountDownLatch(1);
+      Thread t = new Thread() {
+        @Override
+        public void run() {
+          try {
+            main.start();
+            latch.countDown();
+          } catch (Exception e) {
+            throw Throwables.propagate(e);
+          }
+        }
+      };
+
+      try {
+        t.start();
+        // Wait for service to startup
+        latch.await();
+        TransactionServiceClient.doMain(true, conf);
+      } finally {
+        main.stop();
+        t.join();
+      }
+    } finally {
+      zkServer.stopAndWait();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/TransactionSystemTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/TransactionSystemTest.java b/tephra-core/src/test/java/org/apache/tephra/TransactionSystemTest.java
new file mode 100644
index 0000000..797c08a
--- /dev/null
+++ b/tephra-core/src/test/java/org/apache/tephra/TransactionSystemTest.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.tephra.persist.TransactionSnapshot;
+import org.apache.tephra.persist.TransactionStateStorage;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ */
+public abstract class TransactionSystemTest {
+
+  public static final byte[] C1 = new byte[] { 'c', '1' };
+  public static final byte[] C2 = new byte[] { 'c', '2' };
+  public static final byte[] C3 = new byte[] { 'c', '3' };
+  public static final byte[] C4 = new byte[] { 'c', '4' };
+
+  protected abstract TransactionSystemClient getClient() throws Exception;
+
+  protected abstract TransactionStateStorage getStateStorage() throws Exception;
+
+  @Test
+  public void testCommitRaceHandling() throws Exception {
+    TransactionSystemClient client1 = getClient();
+    TransactionSystemClient client2 = getClient();
+
+    Transaction tx1 = client1.startShort();
+    Transaction tx2 = client2.startShort();
+
+    Assert.assertTrue(client1.canCommit(tx1, asList(C1, C2)));
+    // second one also can commit even thought there are conflicts with first since first one hasn't committed yet
+    Assert.assertTrue(client2.canCommit(tx2, asList(C2, C3)));
+
+    Assert.assertTrue(client1.commit(tx1));
+
+    // now second one should not commit, since there are conflicts with tx1 that has been committed
+    Assert.assertFalse(client2.commit(tx2));
+  }
+
+  @Test
+  public void testMultipleCommitsAtSameTime() throws Exception {
+    // We want to check that if two txs finish at same time (wrt tx manager) they do not overwrite changesets of each
+    // other in tx manager used for conflicts detection (we had this bug)
+    // NOTE: you don't have to use multiple clients for that
+    TransactionSystemClient client1 = getClient();
+    TransactionSystemClient client2 = getClient();
+    TransactionSystemClient client3 = getClient();
+    TransactionSystemClient client4 = getClient();
+    TransactionSystemClient client5 = getClient();
+
+    Transaction tx1 = client1.startShort();
+    Transaction tx2 = client2.startShort();
+    Transaction tx3 = client3.startShort();
+    Transaction tx4 = client4.startShort();
+    Transaction tx5 = client5.startShort();
+
+    Assert.assertTrue(client1.canCommit(tx1, asList(C1)));
+    Assert.assertTrue(client1.commit(tx1));
+
+    Assert.assertTrue(client2.canCommit(tx2, asList(C2)));
+    Assert.assertTrue(client2.commit(tx2));
+
+    // verifying conflicts detection
+    Assert.assertFalse(client3.canCommit(tx3, asList(C1)));
+    Assert.assertFalse(client4.canCommit(tx4, asList(C2)));
+    Assert.assertTrue(client5.canCommit(tx5, asList(C3)));
+  }
+
+  @Test
+  public void testCommitTwice() throws Exception {
+    TransactionSystemClient client = getClient();
+    Transaction tx = client.startShort();
+
+    Assert.assertTrue(client.canCommit(tx, asList(C1, C2)));
+    Assert.assertTrue(client.commit(tx));
+    // cannot commit twice same tx
+    try {
+      Assert.assertFalse(client.commit(tx));
+      Assert.fail();
+    } catch (TransactionNotInProgressException e) {
+      // expected
+    }
+  }
+
+  @Test
+  public void testAbortTwice() throws Exception {
+    TransactionSystemClient client = getClient();
+    Transaction tx = client.startShort();
+
+    Assert.assertTrue(client.canCommit(tx, asList(C1, C2)));
+    client.abort(tx);
+    // abort of not active tx has no affect
+    client.abort(tx);
+  }
+
+  @Test
+  public void testReuseTx() throws Exception {
+    TransactionSystemClient client = getClient();
+    Transaction tx = client.startShort();
+
+    Assert.assertTrue(client.canCommit(tx, asList(C1, C2)));
+    Assert.assertTrue(client.commit(tx));
+    // can't re-use same tx again
+    try {
+      client.canCommit(tx, asList(C3, C4));
+      Assert.fail();
+    } catch (TransactionNotInProgressException e) {
+      // expected
+    }
+    try {
+      Assert.assertFalse(client.commit(tx));
+      Assert.fail();
+    } catch (TransactionNotInProgressException e) {
+      // expected
+    }
+
+    // abort of not active tx has no affect
+    client.abort(tx);
+  }
+
+  @Test
+  public void testUseNotStarted() throws Exception {
+    TransactionSystemClient client = getClient();
+    Transaction tx1 = client.startShort();
+    Assert.assertTrue(client.commit(tx1));
+
+    // we know this is one is older than current writePointer and was not used
+    Transaction txOld = new Transaction(tx1.getReadPointer(), tx1.getTransactionId() - 1,
+                                        new long[] {}, new long[] {}, Transaction.NO_TX_IN_PROGRESS, 
+                                        TransactionType.SHORT);
+    try {
+      Assert.assertFalse(client.canCommit(txOld, asList(C3, C4)));
+      Assert.fail();
+    } catch (TransactionNotInProgressException e) {
+      // expected
+    }
+    try {
+      Assert.assertFalse(client.commit(txOld));
+      Assert.fail();
+    } catch (TransactionNotInProgressException e) {
+      // expected
+    }
+    // abort of not active tx has no affect
+    client.abort(txOld);
+
+    // we know this is one is newer than current readPointer and was not used
+    Transaction txNew = new Transaction(tx1.getReadPointer(), tx1.getTransactionId() + 1,
+                                        new long[] {}, new long[] {}, Transaction.NO_TX_IN_PROGRESS, 
+                                        TransactionType.SHORT);
+    try {
+      Assert.assertFalse(client.canCommit(txNew, asList(C3, C4)));
+      Assert.fail();
+    } catch (TransactionNotInProgressException e) {
+      // expected
+    }
+    try {
+      Assert.assertFalse(client.commit(txNew));
+      Assert.fail();
+    } catch (TransactionNotInProgressException e) {
+      // expected
+    }
+    // abort of not active tx has no affect
+    client.abort(txNew);
+  }
+
+  @Test
+  public void testAbortAfterCommit() throws Exception {
+    TransactionSystemClient client = getClient();
+    Transaction tx = client.startShort();
+
+    Assert.assertTrue(client.canCommit(tx, asList(C1, C2)));
+    Assert.assertTrue(client.commit(tx));
+    // abort of not active tx has no affect
+    client.abort(tx);
+  }
+
+  // todo add test invalidate method
+  @Test
+  public void testInvalidateTx() throws Exception {
+    TransactionSystemClient client = getClient();
+    // Invalidate an in-progress tx
+    Transaction tx1 = client.startShort();
+    client.canCommit(tx1, asList(C1, C2));
+    Assert.assertTrue(client.invalidate(tx1.getTransactionId()));
+    // Cannot invalidate a committed tx
+    Transaction tx2 = client.startShort();
+    client.canCommit(tx2, asList(C3, C4));
+    client.commit(tx2);
+    Assert.assertFalse(client.invalidate(tx2.getTransactionId()));
+  }
+
+  @Test
+  public void testResetState() throws Exception {
+    // have tx in progress, committing and committed then reset,
+    // get the last snapshot and see that it is empty
+    TransactionSystemClient client = getClient();
+    TransactionStateStorage stateStorage = getStateStorage();
+
+    Transaction tx1 = client.startShort();
+    Transaction tx2 = client.startShort();
+    client.canCommit(tx1, asList(C1, C2));
+    client.commit(tx1);
+    client.canCommit(tx2, asList(C3, C4));
+
+    Transaction txPreReset = client.startShort();
+    long currentTs = System.currentTimeMillis();
+    client.resetState();
+
+    TransactionSnapshot snapshot = stateStorage.getLatestSnapshot();
+    Assert.assertTrue(snapshot.getTimestamp() >= currentTs);
+    Assert.assertEquals(0, snapshot.getInvalid().size());
+    Assert.assertEquals(0, snapshot.getInProgress().size());
+    Assert.assertEquals(0, snapshot.getCommittingChangeSets().size());
+    Assert.assertEquals(0, snapshot.getCommittedChangeSets().size());
+
+    // confirm that transaction IDs are not reset
+    Transaction txPostReset = client.startShort();
+    Assert.assertTrue("New tx ID should be greater than last ID before reset",
+                      txPostReset.getTransactionId() > txPreReset.getTransactionId());
+  }
+  
+  @Test
+  public void testTruncateInvalidTx() throws Exception {
+    // Start few transactions and invalidate all of them
+    TransactionSystemClient client = getClient();
+    Transaction tx1 = client.startLong();
+    Transaction tx2 = client.startShort();
+    Transaction tx3 = client.startLong();
+    
+    client.invalidate(tx1.getTransactionId());
+    client.invalidate(tx2.getTransactionId());
+    client.invalidate(tx3.getTransactionId());
+    
+    // Remove tx2 and tx3 from invalid list
+    Assert.assertTrue(client.truncateInvalidTx(ImmutableSet.of(tx2.getTransactionId(), tx3.getTransactionId())));
+    
+    Transaction tx = client.startShort();
+    // Only tx1 should be in invalid list now
+    Assert.assertArrayEquals(new long[] {tx1.getTransactionId()}, tx.getInvalids());
+    client.abort(tx);
+  }
+
+  @Test
+  public void testTruncateInvalidTxBefore() throws Exception {
+    // Start few transactions
+    TransactionSystemClient client = getClient();
+    Transaction tx1 = client.startLong();
+    Transaction tx2 = client.startShort();
+    // Sleep so that transaction ids get generated a millisecond apart for assertion
+    // TEPHRA-63 should eliminate the need to sleep
+    TimeUnit.MILLISECONDS.sleep(1);
+    long beforeTx3 = System.currentTimeMillis();
+    Transaction tx3 = client.startLong();
+    
+    try {
+      // throws exception since tx1 and tx2 are still in-progress
+      client.truncateInvalidTxBefore(beforeTx3);
+      Assert.fail("Expected InvalidTruncateTimeException exception");
+    } catch (InvalidTruncateTimeException e) {
+      // Expected exception
+    }
+    
+    // Invalidate all of them
+    client.invalidate(tx1.getTransactionId());
+    client.invalidate(tx2.getTransactionId());
+    client.invalidate(tx3.getTransactionId());
+
+    // Remove transactions before time beforeTx3
+    Assert.assertTrue(client.truncateInvalidTxBefore(beforeTx3));
+
+    Transaction tx = client.startShort();
+    // Only tx3 should be in invalid list now
+    Assert.assertArrayEquals(new long[] {tx3.getTransactionId()}, tx.getInvalids());
+    client.abort(tx);
+  }
+
+  @Test
+  public void testGetInvalidSize() throws Exception {
+    // Start few transactions and invalidate all of them
+    TransactionSystemClient client = getClient();
+    Transaction tx1 = client.startLong();
+    Transaction tx2 = client.startShort();
+    Transaction tx3 = client.startLong();
+
+    Assert.assertEquals(0, client.getInvalidSize());
+
+    client.invalidate(tx1.getTransactionId());
+    client.invalidate(tx2.getTransactionId());
+    client.invalidate(tx3.getTransactionId());
+
+    Assert.assertEquals(3, client.getInvalidSize());
+  }
+
+  private Collection<byte[]> asList(byte[]... val) {
+    return Arrays.asList(val);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/TransactionTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/TransactionTest.java b/tephra-core/src/test/java/org/apache/tephra/TransactionTest.java
new file mode 100644
index 0000000..08961f4
--- /dev/null
+++ b/tephra-core/src/test/java/org/apache/tephra/TransactionTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.primitives.Longs;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Set;
+
+/**
+ *
+ */
+public class TransactionTest {
+  // Current transaction id
+  private final long txId = 200L;
+  // Read pointer for current transaction
+  private final long readPointer = 100L;
+  // Transactions committed before current transaction was started.
+  private final Set<Long> priorCommitted = ImmutableSet.of(80L, 99L, 100L);
+  // Transactions committed after current transaction was started.
+  private final Set<Long> postCommitted = ImmutableSet.of(150L, 180L, 210L);
+  // Invalid transactions before current transaction was started.
+  private final Set<Long> priorInvalids = ImmutableSet.of(90L, 110L, 190L);
+  // Invalid transactions after current transaction was started.
+  private final Set<Long> postInvalids = ImmutableSet.of(201L, 221L, 231L);
+  // Transactions in progress before current transaction was started.
+  private final Set<Long> priorInProgress = ImmutableSet.of(95L, 120L, 150L);
+  // Transactions in progress after current transaction was started.
+  private final Set<Long> postInProgress = ImmutableSet.of(205L, 215L, 300L);
+
+  @Test
+  public void testSnapshotVisibility() throws Exception {
+    Transaction.VisibilityLevel visibilityLevel = Transaction.VisibilityLevel.SNAPSHOT;
+
+    Set<Long> checkPointers = ImmutableSet.of(220L, 250L);
+    Transaction tx = new Transaction(readPointer, txId, 250L, toSortedArray(priorInvalids),
+                                     toSortedArray(priorInProgress), 95L,
+                                     TransactionType.SHORT, toSortedArray(checkPointers),
+                                     visibilityLevel);
+    Set<Long> visibleCurrent = ImmutableSet.of(200L, 220L, 250L);
+    Set<Long> notVisibleCurrent = ImmutableSet.of();
+    assertVisibility(priorCommitted, postCommitted, priorInvalids, postInvalids, priorInProgress, postInProgress,
+                     visibleCurrent, notVisibleCurrent, tx);
+
+    checkPointers = ImmutableSet.of();
+    tx = new Transaction(readPointer, txId, txId, toSortedArray(priorInvalids), toSortedArray(priorInProgress), 95L,
+                                     TransactionType.SHORT, toSortedArray(checkPointers),
+                         visibilityLevel);
+    visibleCurrent = ImmutableSet.of(txId);
+    notVisibleCurrent = ImmutableSet.of();
+    assertVisibility(priorCommitted, postCommitted, priorInvalids, postInvalids, priorInProgress, postInProgress,
+                     visibleCurrent, notVisibleCurrent, tx);
+  }
+
+  @Test
+  public void testSnapshotExcludeVisibility() throws Exception {
+    Transaction.VisibilityLevel visibilityLevel = Transaction.VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT;
+
+    Set<Long> checkPointers = ImmutableSet.of(220L, 250L);
+    Transaction tx = new Transaction(readPointer, txId, 250L, toSortedArray(priorInvalids),
+                                     toSortedArray(priorInProgress), 95L,
+                                     TransactionType.SHORT, toSortedArray(checkPointers),
+                                     visibilityLevel);
+    Set<Long> visibleCurrent = ImmutableSet.of(200L, 220L);
+    Set<Long> notVisibleCurrent = ImmutableSet.of(250L);
+    assertVisibility(priorCommitted, postCommitted, priorInvalids, postInvalids, priorInProgress, postInProgress,
+                     visibleCurrent, notVisibleCurrent, tx);
+
+    checkPointers = ImmutableSet.of();
+    tx = new Transaction(readPointer, txId, txId, toSortedArray(priorInvalids), toSortedArray(priorInProgress), 95L,
+                         TransactionType.SHORT, toSortedArray(checkPointers),
+                         visibilityLevel);
+    visibleCurrent = ImmutableSet.of();
+    notVisibleCurrent = ImmutableSet.of(txId);
+    assertVisibility(priorCommitted, postCommitted, priorInvalids, postInvalids, priorInProgress, postInProgress,
+                     visibleCurrent, notVisibleCurrent, tx);
+  }
+
+  @Test
+  public void testSnapshotAllVisibility() throws Exception {
+    Transaction.VisibilityLevel visibilityLevel = Transaction.VisibilityLevel.SNAPSHOT_ALL;
+
+    Set<Long> checkPointers = ImmutableSet.of(220L, 250L);
+    Transaction tx = new Transaction(readPointer, txId, 250L, toSortedArray(priorInvalids),
+                                     toSortedArray(priorInProgress), 95L,
+                                     TransactionType.SHORT, toSortedArray(checkPointers),
+                                     visibilityLevel);
+    Set<Long> visibleCurrent = ImmutableSet.of(200L, 220L, 250L);
+    Set<Long> notVisibleCurrent = ImmutableSet.of();
+    assertVisibility(priorCommitted, postCommitted, priorInvalids, postInvalids, priorInProgress, postInProgress,
+                     visibleCurrent, notVisibleCurrent, tx);
+
+    checkPointers = ImmutableSet.of();
+    tx = new Transaction(readPointer, txId, txId, toSortedArray(priorInvalids),
+                         toSortedArray(priorInProgress), 95L,
+                         TransactionType.SHORT, toSortedArray(checkPointers),
+                         visibilityLevel);
+    visibleCurrent = ImmutableSet.of(txId);
+    notVisibleCurrent = ImmutableSet.of();
+    assertVisibility(priorCommitted, postCommitted, priorInvalids, postInvalids, priorInProgress, postInProgress,
+                     visibleCurrent, notVisibleCurrent, tx);
+  }
+
+  private void assertVisibility(Set<Long> priorCommitted, Set<Long> postCommitted, Set<Long> priorInvalids,
+                                Set<Long> postInvalids, Set<Long> priorInProgress, Set<Long> postInProgress,
+                                Set<Long> visibleCurrent, Set<Long> notVisibleCurrent,
+                                Transaction tx) {
+    // Verify visible snapshots of tx are visible
+    for (long t : visibleCurrent) {
+      Assert.assertTrue("Assertion error for version = " + t, tx.isVisible(t));
+    }
+
+    // Verify not visible snapshots of tx are not visible
+    for (long t : notVisibleCurrent) {
+      Assert.assertFalse("Assertion error for version = " + t, tx.isVisible(t));
+    }
+
+    // Verify prior committed versions are visible
+    for (long t : priorCommitted) {
+      Assert.assertTrue("Assertion error for version = " + t, tx.isVisible(t));
+    }
+
+    // Verify versions committed after tx started, and not part of tx are not visible
+    for (long t : postCommitted) {
+      Assert.assertFalse("Assertion error for version = " + t, tx.isVisible(t));
+    }
+
+    // Verify invalid and in-progress versions are not visible
+    for (long t : Iterables.concat(priorInvalids, postInvalids, priorInProgress, postInProgress)) {
+      Assert.assertFalse("Assertion error for version = " + t, tx.isVisible(t));
+    }
+  }
+
+  private long[] toSortedArray(Set<Long> set) {
+    long[] array = Longs.toArray(set);
+    Arrays.sort(array);
+    return array;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/distributed/ElasticPoolTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/distributed/ElasticPoolTest.java b/tephra-core/src/test/java/org/apache/tephra/distributed/ElasticPoolTest.java
new file mode 100644
index 0000000..6a30280
--- /dev/null
+++ b/tephra-core/src/test/java/org/apache/tephra/distributed/ElasticPoolTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.distributed;
+
+import com.google.common.base.Throwables;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Tests for {@link ElasticPool}.
+ */
+public class ElasticPoolTest {
+
+  static class Dummy {
+    static AtomicInteger count = new AtomicInteger(0);
+    boolean valid = true;
+    Dummy() {
+      count.incrementAndGet();
+    }
+    void markInvalid() {
+      valid = false;
+    }
+
+    public boolean isValid() {
+      return valid;
+    }
+  }
+
+  class DummyPool extends ElasticPool<Dummy, RuntimeException> {
+
+    public DummyPool(int sizeLimit) {
+      super(sizeLimit);
+    }
+
+    @Override
+    protected Dummy create() {
+      return new Dummy();
+    }
+
+    @Override
+    protected boolean recycle(Dummy element) {
+      return element.isValid();
+    }
+  }
+
+  @Test(timeout = 5000)
+  public void testFewerThreadsThanElements() throws InterruptedException {
+    final DummyPool pool = new DummyPool(5);
+    Dummy.count.set(0);
+    createAndRunThreads(2, pool, false);
+    // we only ran 2 threads, so only 2 elements got created, even though pool size is 5
+    Assert.assertEquals(2, Dummy.count.get());
+  }
+
+  @Test(timeout = 5000)
+  public void testMoreThreadsThanElements() throws InterruptedException {
+    final DummyPool pool = new DummyPool(2);
+    Dummy.count.set(0);
+    createAndRunThreads(5, pool, false);
+    // even though we ran 5 threads, only 2 elements got created because pool size is 2
+    Assert.assertEquals(2, Dummy.count.get());
+  }
+
+  @Test(timeout = 5000)
+  public void testMoreThreadsThanElementsWithDiscard() throws InterruptedException {
+    final DummyPool pool = new DummyPool(2);
+    Dummy.count.set(0);
+    int numThreads = 3;
+    // pass 'true' as the last parameter, which results in the elements being discarded after each obtain() call.
+    createAndRunThreads(numThreads, pool, true);
+    // this results in (5 * numThreads) number of elements being created since each thread obtains a client 5 times.
+    Assert.assertEquals(5 * numThreads, Dummy.count.get());
+  }
+
+  // Creates a list of threads which obtain a client from the pool, sleeps for a certain amount of time, and then
+  // releases the client back to the pool, optionally marking it invalid before doing so. It repeats this five times.
+  // Then, runs these threads to completion.
+  private void createAndRunThreads(int numThreads, final DummyPool pool,
+                                   final boolean discardAtEnd) throws InterruptedException {
+    Thread[] threads = new Thread[numThreads];
+    for (int i = 0; i < numThreads; i++) {
+      threads[i] = new Thread() {
+        @Override
+        public void run() {
+          for (int j = 0; j < 5; ++j) {
+            Dummy dummy;
+            try {
+              dummy = pool.obtain();
+            } catch (InterruptedException e) {
+              throw Throwables.propagate(e);
+            }
+            try {
+              Thread.sleep(10L);
+            } catch (InterruptedException e) {
+              // ignored
+            }
+            if (discardAtEnd) {
+              dummy.markInvalid();
+            }
+            pool.release(dummy);
+          }
+        }
+      };
+    }
+    for (Thread t : threads) {
+      t.start();
+    }
+    for (Thread t : threads) {
+      t.join();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/distributed/PooledClientProviderTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/distributed/PooledClientProviderTest.java b/tephra-core/src/test/java/org/apache/tephra/distributed/PooledClientProviderTest.java
new file mode 100644
index 0000000..90a69e9
--- /dev/null
+++ b/tephra-core/src/test/java/org/apache/tephra/distributed/PooledClientProviderTest.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.distributed;
+
+import com.google.common.base.Throwables;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tephra.TransactionServiceMain;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.runtime.ConfigModule;
+import org.apache.tephra.runtime.DiscoveryModules;
+import org.apache.tephra.runtime.TransactionClientModule;
+import org.apache.tephra.runtime.TransactionModules;
+import org.apache.tephra.runtime.ZKModule;
+import org.apache.twill.discovery.DiscoveryServiceClient;
+import org.apache.twill.internal.zookeeper.InMemoryZKServer;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeoutException;
+
+public class PooledClientProviderTest {
+
+  public static final int MAX_CLIENT_COUNT = 3;
+  public static final long CLIENT_OBTAIN_TIMEOUT = 10;
+
+  @ClassRule
+  public static TemporaryFolder tmpFolder = new TemporaryFolder();
+
+  @Test
+  public void testClientConnectionPoolMaximumNumberOfClients() throws Exception {
+    // We need a server for the client to connect to
+    InMemoryZKServer zkServer = InMemoryZKServer.builder().setDataDir(tmpFolder.newFolder()).build();
+    zkServer.startAndWait();
+
+    try {
+      Configuration conf = new Configuration();
+      conf.set(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM, zkServer.getConnectionStr());
+      conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath());
+      conf.set("data.tx.client.count", Integer.toString(MAX_CLIENT_COUNT));
+      conf.set("data.tx.client.obtain.timeout", Long.toString(CLIENT_OBTAIN_TIMEOUT));
+
+      final TransactionServiceMain main = new TransactionServiceMain(conf);
+      final CountDownLatch latch = new CountDownLatch(1);
+      Thread t = new Thread() {
+        @Override
+        public void run() {
+          try {
+            main.start();
+            latch.countDown();
+          } catch (Exception e) {
+            throw Throwables.propagate(e);
+          }
+        }
+      };
+
+      try {
+        t.start();
+        // Wait for service to startup
+        latch.await();
+
+        startClientAndTestPool(conf);
+      } finally {
+        main.stop();
+        t.join();
+      }
+    } finally {
+      zkServer.stopAndWait();
+    }
+  }
+
+  private void startClientAndTestPool(Configuration conf) throws Exception {
+    Injector injector = Guice.createInjector(
+      new ConfigModule(conf),
+      new ZKModule(),
+      new DiscoveryModules().getDistributedModules(),
+      new TransactionModules().getDistributedModules(),
+      new TransactionClientModule()
+    );
+
+    ZKClientService zkClient = injector.getInstance(ZKClientService.class);
+    zkClient.startAndWait();
+
+    final PooledClientProvider clientProvider = new PooledClientProvider(conf,
+      injector.getInstance(DiscoveryServiceClient.class));
+
+    // test simple case of get + return. Note: this also initializes the provider's pool, which
+    // takes about one second (discovery). Doing it before we test the threads makes it so that one
+    // thread doesn't take exceptionally longer than the others.
+    try (CloseableThriftClient closeableThriftClient = clientProvider.getCloseableClient()) {
+      // do nothing with the client
+    }
+
+    //Now race to get MAX_CLIENT_COUNT+1 clients, exhausting the pool and requesting 1 more.
+    List<Future<Integer>> clientIds = new ArrayList<Future<Integer>>();
+    CountDownLatch countDownLatch = new CountDownLatch(1);
+    ExecutorService executor = Executors.newFixedThreadPool(MAX_CLIENT_COUNT + 1);
+    for (int i = 0; i < MAX_CLIENT_COUNT + 1; i++) {
+      clientIds.add(executor.submit(new RetrieveClient(clientProvider, CLIENT_OBTAIN_TIMEOUT / 2, countDownLatch)));
+    }
+    countDownLatch.countDown();
+
+    Set<Integer> ids = new HashSet<Integer>();
+    for (Future<Integer> id : clientIds) {
+      ids.add(id.get());
+    }
+    Assert.assertEquals(MAX_CLIENT_COUNT, ids.size());
+
+    // now, try it again with, where each thread holds onto the client for twice the client.obtain.timeout value.
+    // one of the threads should throw a TimeOutException, because the other threads don't release their clients
+    // within the configured timeout.
+    countDownLatch = new CountDownLatch(1);
+    for (int i = 0; i < MAX_CLIENT_COUNT + 1; i++) {
+      clientIds.add(executor.submit(new RetrieveClient(clientProvider, CLIENT_OBTAIN_TIMEOUT * 2, countDownLatch)));
+    }
+    countDownLatch.countDown();
+    int numTimeoutExceptions = 0;
+    for (Future<Integer> clientId : clientIds) {
+      try {
+        clientId.get();
+      } catch (ExecutionException expected) {
+        Assert.assertEquals(TimeoutException.class, expected.getCause().getClass());
+        numTimeoutExceptions++;
+      }
+    }
+    // expect that exactly one of the threads hit the TimeoutException
+    Assert.assertEquals(String.format("Expected one thread to not obtain a client within %s milliseconds.",
+                                      CLIENT_OBTAIN_TIMEOUT),
+                        1, numTimeoutExceptions);
+
+    executor.shutdown();
+  }
+
+  private static class RetrieveClient implements Callable<Integer> {
+    private final PooledClientProvider pool;
+    private final long holdClientMs;
+    private final CountDownLatch begin;
+
+    public RetrieveClient(PooledClientProvider pool, long holdClientMs,
+                          CountDownLatch begin) {
+      this.pool = pool;
+      this.holdClientMs = holdClientMs;
+      this.begin = begin;
+    }
+
+    @Override
+    public Integer call() throws Exception {
+      begin.await();
+      try (CloseableThriftClient client = pool.getCloseableClient()) {
+        int id = System.identityHashCode(client.getThriftClient());
+        // "use" the client for a configured amount of milliseconds
+        Thread.sleep(holdClientMs);
+        return id;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/distributed/ThriftTransactionServerTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/distributed/ThriftTransactionServerTest.java b/tephra-core/src/test/java/org/apache/tephra/distributed/ThriftTransactionServerTest.java
new file mode 100644
index 0000000..a930720
--- /dev/null
+++ b/tephra-core/src/test/java/org/apache/tephra/distributed/ThriftTransactionServerTest.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.distributed;
+
+import com.google.common.util.concurrent.Service;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Scopes;
+import com.google.inject.util.Modules;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tephra.ThriftTransactionSystemTest;
+import org.apache.tephra.TransactionSystemClient;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.persist.InMemoryTransactionStateStorage;
+import org.apache.tephra.persist.TransactionEdit;
+import org.apache.tephra.persist.TransactionLog;
+import org.apache.tephra.persist.TransactionStateStorage;
+import org.apache.tephra.runtime.ConfigModule;
+import org.apache.tephra.runtime.DiscoveryModules;
+import org.apache.tephra.runtime.TransactionClientModule;
+import org.apache.tephra.runtime.TransactionModules;
+import org.apache.tephra.runtime.ZKModule;
+import org.apache.twill.internal.zookeeper.InMemoryZKServer;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This tests whether transaction service hangs on stop when heavily loaded - https://issues.cask.co/browse/TEPHRA-132
+ */
+public class ThriftTransactionServerTest {
+  private static final Logger LOG = LoggerFactory.getLogger(ThriftTransactionSystemTest.class);
+
+  private static InMemoryZKServer zkServer;
+  private static ZKClientService zkClientService;
+  private static TransactionService txService;
+  private static TransactionStateStorage storage;
+  static Injector injector;
+
+  private static final int NUM_CLIENTS = 17;
+  private static final CountDownLatch STORAGE_WAIT_LATCH = new CountDownLatch(1);
+  private static final CountDownLatch CLIENTS_DONE_LATCH = new CountDownLatch(NUM_CLIENTS);
+
+  @ClassRule
+  public static TemporaryFolder tmpFolder = new TemporaryFolder();
+
+  @BeforeClass
+  public static void start() throws Exception {
+    zkServer = InMemoryZKServer.builder().setDataDir(tmpFolder.newFolder()).build();
+    zkServer.startAndWait();
+
+    Configuration conf = new Configuration();
+    conf.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false);
+    conf.set(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM, zkServer.getConnectionStr());
+    conf.set(TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, "n-times");
+    conf.setInt(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, 1);
+    conf.setInt(TxConstants.Service.CFG_DATA_TX_CLIENT_COUNT, NUM_CLIENTS);
+    conf.setLong(TxConstants.Service.CFG_DATA_TX_CLIENT_TIMEOUT, TimeUnit.HOURS.toMillis(1));
+    conf.setInt(TxConstants.Service.CFG_DATA_TX_SERVER_IO_THREADS, 2);
+    conf.setInt(TxConstants.Service.CFG_DATA_TX_SERVER_THREADS, 4);
+
+    injector = Guice.createInjector(
+      new ConfigModule(conf),
+      new ZKModule(),
+      new DiscoveryModules().getDistributedModules(),
+      Modules.override(new TransactionModules().getDistributedModules())
+        .with(new AbstractModule() {
+          @Override
+          protected void configure() {
+            bind(TransactionStateStorage.class).to(SlowTransactionStorage.class).in(Scopes.SINGLETON);
+          }
+        }),
+      new TransactionClientModule()
+    );
+
+    zkClientService = injector.getInstance(ZKClientService.class);
+    zkClientService.startAndWait();
+
+    // start a tx server
+    txService = injector.getInstance(TransactionService.class);
+    storage = injector.getInstance(TransactionStateStorage.class);
+    try {
+      LOG.info("Starting transaction service");
+      txService.startAndWait();
+    } catch (Exception e) {
+      LOG.error("Failed to start service: ", e);
+    }
+  }
+
+  @Before
+  public void reset() throws Exception {
+    getClient().resetState();
+  }
+
+  @AfterClass
+  public static void stop() throws Exception {
+    txService.stopAndWait();
+    storage.stopAndWait();
+    zkClientService.stopAndWait();
+    zkServer.stopAndWait();
+  }
+
+  public TransactionSystemClient getClient() throws Exception {
+    return injector.getInstance(TransactionSystemClient.class);
+  }
+
+  @Test
+  public void testThriftServerStop() throws Exception {
+    int nThreads = NUM_CLIENTS;
+    ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
+    for (int i = 0; i < nThreads; ++i) {
+      executorService.submit(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            TransactionSystemClient txClient = getClient();
+            CLIENTS_DONE_LATCH.countDown();
+            txClient.startShort();
+          } catch (Exception e) {
+            // Exception expected
+          }
+        }
+      });
+    }
+
+    // Wait till all clients finish sending reqeust to transaction manager
+    CLIENTS_DONE_LATCH.await();
+    TimeUnit.SECONDS.sleep(1);
+
+    // Expire zookeeper session, which causes Thrift server to stop.
+    expireZkSession(zkClientService);
+    waitForThriftTermination();
+
+    // Stop Zookeeper client so that it does not re-connect to Zookeeper and start Thrift sever again.
+    zkClientService.stopAndWait();
+    STORAGE_WAIT_LATCH.countDown();
+    TimeUnit.SECONDS.sleep(1);
+
+    // Make sure Thrift server stopped.
+    Assert.assertEquals(Service.State.TERMINATED, txService.thriftRPCServerState());
+  }
+
+  private void expireZkSession(ZKClientService zkClientService) throws Exception {
+    ZooKeeper zooKeeper = zkClientService.getZooKeeperSupplier().get();
+    final SettableFuture<?> connectFuture = SettableFuture.create();
+    Watcher watcher = new Watcher() {
+      @Override
+      public void process(WatchedEvent event) {
+        if (event.getState() == Event.KeeperState.SyncConnected) {
+          connectFuture.set(null);
+        }
+      }
+    };
+
+    // Create another Zookeeper session with the same sessionId so that the original one expires.
+    final ZooKeeper dupZookeeper =
+      new ZooKeeper(zkClientService.getConnectString(), zooKeeper.getSessionTimeout(), watcher,
+                    zooKeeper.getSessionId(), zooKeeper.getSessionPasswd());
+    connectFuture.get(30, TimeUnit.SECONDS);
+    Assert.assertEquals("Failed to re-create current session", dupZookeeper.getState(), ZooKeeper.States.CONNECTED);
+    dupZookeeper.close();
+  }
+
+  private void waitForThriftTermination() throws InterruptedException {
+    int count = 0;
+    while (txService.thriftRPCServerState() != Service.State.TERMINATED && count++ < 200) {
+      TimeUnit.MILLISECONDS.sleep(50);
+    }
+  }
+
+  private static class SlowTransactionStorage extends InMemoryTransactionStateStorage {
+    @Override
+    public TransactionLog createLog(long timestamp) throws IOException {
+      return new SlowTransactionLog(timestamp);
+    }
+  }
+
+  private static class SlowTransactionLog extends InMemoryTransactionStateStorage.InMemoryTransactionLog {
+    public SlowTransactionLog(long timestamp) {
+      super(timestamp);
+    }
+
+    @Override
+    public void append(TransactionEdit edit) throws IOException {
+      try {
+        STORAGE_WAIT_LATCH.await();
+      } catch (InterruptedException e) {
+        LOG.error("Got exception: ", e);
+      }
+      super.append(edit);
+    }
+
+    @Override
+    public void append(List<TransactionEdit> edits) throws IOException {
+      try {
+        STORAGE_WAIT_LATCH.await();
+      } catch (InterruptedException e) {
+        LOG.error("Got exception: ", e);
+      }
+      super.append(edits);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/hbase/AbstractTransactionVisibilityFilterTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/hbase/AbstractTransactionVisibilityFilterTest.java b/tephra-core/src/test/java/org/apache/tephra/hbase/AbstractTransactionVisibilityFilterTest.java
new file mode 100644
index 0000000..4828adf
--- /dev/null
+++ b/tephra-core/src/test/java/org/apache/tephra/hbase/AbstractTransactionVisibilityFilterTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.hbase;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tephra.TransactionManager;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.util.ConfigurationFactory;
+import org.junit.After;
+import org.junit.Before;
+
+import java.util.List;
+
+/**
+ * Common test class for TransactionVisibilityFilter implementations.
+ */
+public abstract class AbstractTransactionVisibilityFilterTest {
+
+  protected static final byte[] FAM = new byte[] {'f'};
+  protected static final byte[] FAM2 = new byte[] {'f', '2'};
+  protected static final byte[] FAM3 = new byte[] {'f', '3'};
+  protected static final byte[] COL = new byte[] {'c'};
+  protected static final List<byte[]> EMPTY_CHANGESET = Lists.newArrayListWithCapacity(0);
+
+  protected TransactionManager txManager;
+
+  @Before
+  public void setup() throws Exception {
+    Configuration conf = new ConfigurationFactory().get();
+    conf.unset(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES);
+    txManager = new TransactionManager(conf);
+    txManager.startAndWait();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    txManager.stopAndWait();
+  }
+}