You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by po...@apache.org on 2016/05/11 20:15:27 UTC
[27/56] [abbrv] [partial] incubator-tephra git commit: Rename package
to org.apache.tephra
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/TransactionExecutorTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/TransactionExecutorTest.java b/tephra-core/src/test/java/org/apache/tephra/TransactionExecutorTest.java
new file mode 100644
index 0000000..28ccc6e
--- /dev/null
+++ b/tephra-core/src/test/java/org/apache/tephra/TransactionExecutorTest.java
@@ -0,0 +1,590 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import com.google.inject.Singleton;
+import com.google.inject.util.Modules;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tephra.inmemory.InMemoryTxSystemClient;
+import org.apache.tephra.runtime.ConfigModule;
+import org.apache.tephra.runtime.DiscoveryModules;
+import org.apache.tephra.runtime.TransactionModules;
+import org.apache.tephra.snapshot.DefaultSnapshotCodec;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import javax.annotation.Nullable;
+
+/**
+ * Tests the transaction executor.
+ */
+public class TransactionExecutorTest {
+ private static DummyTxClient txClient;
+ private static TransactionExecutorFactory factory;
+
+ @ClassRule
+ public static TemporaryFolder tmpFolder = new TemporaryFolder();
+
+ @BeforeClass
+ public static void setup() throws IOException {
+ final Configuration conf = new Configuration();
+ conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, DefaultSnapshotCodec.class.getName());
+ conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath());
+ Injector injector = Guice.createInjector(
+ new ConfigModule(conf),
+ new DiscoveryModules().getInMemoryModules(),
+ Modules.override(
+ new TransactionModules().getInMemoryModules()).with(new AbstractModule() {
+ @Override
+ protected void configure() {
+ TransactionManager txManager = new TransactionManager(conf);
+ txManager.startAndWait();
+ bind(TransactionManager.class).toInstance(txManager);
+ bind(TransactionSystemClient.class).to(DummyTxClient.class).in(Singleton.class);
+ }
+ }));
+
+ txClient = (DummyTxClient) injector.getInstance(TransactionSystemClient.class);
+ factory = injector.getInstance(TransactionExecutorFactory.class);
+ }
+
+ final DummyTxAware ds1 = new DummyTxAware(), ds2 = new DummyTxAware();
+ final Collection<TransactionAware> txAwares = ImmutableList.<TransactionAware>of(ds1, ds2);
+
+ private TransactionExecutor getExecutor() {
+ return factory.createExecutor(txAwares);
+ }
+
+ private TransactionExecutor getExecutorWithNoRetry() {
+ return new DefaultTransactionExecutor(txClient, txAwares, RetryStrategies.noRetries());
+ }
+
+ static final byte[] A = { 'a' };
+ static final byte[] B = { 'b' };
+
+ final TransactionExecutor.Function<Integer, Integer> testFunction =
+ new TransactionExecutor.Function<Integer, Integer>() {
+ @Override
+ public Integer apply(@Nullable Integer input) {
+ ds1.addChange(A);
+ ds2.addChange(B);
+ if (input == null) {
+ throw new RuntimeException("function failed");
+ }
+ return input * input;
+ }
+ };
+
+ @Before
+ public void resetTxAwares() {
+ ds1.reset();
+ ds2.reset();
+ }
+
+ @Test
+ public void testSuccessful() throws TransactionFailureException, InterruptedException {
+ // execute: add a change to ds1 and ds2
+ Integer result = getExecutor().execute(testFunction, 10);
+ // verify both are committed and post-committed
+ Assert.assertTrue(ds1.started);
+ Assert.assertTrue(ds2.started);
+ Assert.assertTrue(ds1.checked);
+ Assert.assertTrue(ds2.checked);
+ Assert.assertTrue(ds1.committed);
+ Assert.assertTrue(ds2.committed);
+ Assert.assertTrue(ds1.postCommitted);
+ Assert.assertTrue(ds2.postCommitted);
+ Assert.assertFalse(ds1.rolledBack);
+ Assert.assertFalse(ds2.rolledBack);
+ Assert.assertTrue(100 == result);
+ Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Committed);
+ }
+
+ @Test
+ public void testPostCommitFailure() throws TransactionFailureException, InterruptedException {
+ ds1.failPostCommitTxOnce = InduceFailure.ThrowException;
+ // execute: add a change to ds1 and ds2
+ try {
+ getExecutor().execute(testFunction, 10);
+ Assert.fail("post commit failed - exception should be thrown");
+ } catch (TransactionFailureException e) {
+ Assert.assertEquals("post failure", e.getCause().getMessage());
+ }
+ // verify both are committed and post-committed
+ Assert.assertTrue(ds1.started);
+ Assert.assertTrue(ds2.started);
+ Assert.assertTrue(ds1.checked);
+ Assert.assertTrue(ds2.checked);
+ Assert.assertTrue(ds1.committed);
+ Assert.assertTrue(ds2.committed);
+ Assert.assertTrue(ds1.postCommitted);
+ Assert.assertTrue(ds2.postCommitted);
+ Assert.assertFalse(ds1.rolledBack);
+ Assert.assertFalse(ds2.rolledBack);
+ Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Committed);
+ }
+
+ @Test
+ public void testPersistFailure() throws TransactionFailureException, InterruptedException {
+ ds1.failCommitTxOnce = InduceFailure.ThrowException;
+ // execute: add a change to ds1 and ds2
+ try {
+ getExecutor().execute(testFunction, 10);
+ Assert.fail("persist failed - exception should be thrown");
+ } catch (TransactionFailureException e) {
+ Assert.assertEquals("persist failure", e.getCause().getMessage());
+ }
+ // verify both are rolled back and tx is aborted
+ Assert.assertTrue(ds1.started);
+ Assert.assertTrue(ds2.started);
+ Assert.assertTrue(ds1.checked);
+ Assert.assertTrue(ds2.checked);
+ Assert.assertTrue(ds1.committed);
+ Assert.assertFalse(ds2.committed);
+ Assert.assertFalse(ds1.postCommitted);
+ Assert.assertFalse(ds2.postCommitted);
+ Assert.assertTrue(ds1.rolledBack);
+ Assert.assertTrue(ds2.rolledBack);
+ Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted);
+ }
+
+ @Test
+ public void testPersistFalse() throws TransactionFailureException, InterruptedException {
+ ds1.failCommitTxOnce = InduceFailure.ReturnFalse;
+ // execute: add a change to ds1 and ds2
+ try {
+ getExecutor().execute(testFunction, 10);
+ Assert.fail("persist failed - exception should be thrown");
+ } catch (TransactionFailureException e) {
+ Assert.assertNull(e.getCause()); // in this case, the ds simply returned false
+ }
+ // verify both are rolled back and tx is aborted
+ Assert.assertTrue(ds1.started);
+ Assert.assertTrue(ds2.started);
+ Assert.assertTrue(ds1.checked);
+ Assert.assertTrue(ds2.checked);
+ Assert.assertTrue(ds1.committed);
+ Assert.assertFalse(ds2.committed);
+ Assert.assertFalse(ds1.postCommitted);
+ Assert.assertFalse(ds2.postCommitted);
+ Assert.assertTrue(ds1.rolledBack);
+ Assert.assertTrue(ds2.rolledBack);
+ Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted);
+ }
+
+ @Test
+ public void testPersistAndRollbackFailure() throws TransactionFailureException, InterruptedException {
+ ds1.failCommitTxOnce = InduceFailure.ThrowException;
+ ds1.failRollbackTxOnce = InduceFailure.ThrowException;
+ // execute: add a change to ds1 and ds2
+ try {
+ getExecutor().execute(testFunction, 10);
+ Assert.fail("persist failed - exception should be thrown");
+ } catch (TransactionFailureException e) {
+ Assert.assertEquals("persist failure", e.getCause().getMessage());
+ }
+ // verify both are rolled back and tx is invalidated
+ Assert.assertTrue(ds1.started);
+ Assert.assertTrue(ds2.started);
+ Assert.assertTrue(ds1.checked);
+ Assert.assertTrue(ds2.checked);
+ Assert.assertTrue(ds1.committed);
+ Assert.assertFalse(ds2.committed);
+ Assert.assertFalse(ds1.postCommitted);
+ Assert.assertFalse(ds2.postCommitted);
+ Assert.assertTrue(ds1.rolledBack);
+ Assert.assertTrue(ds2.rolledBack);
+ Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Invalidated);
+ }
+
+ @Test
+ public void testPersistAndRollbackFalse() throws TransactionFailureException, InterruptedException {
+ ds1.failCommitTxOnce = InduceFailure.ReturnFalse;
+ ds1.failRollbackTxOnce = InduceFailure.ReturnFalse;
+ // execute: add a change to ds1 and ds2
+ try {
+ getExecutor().execute(testFunction, 10);
+ Assert.fail("persist failed - exception should be thrown");
+ } catch (TransactionFailureException e) {
+ Assert.assertNull(e.getCause()); // in this case, the ds simply returned false
+ }
+ // verify both are rolled back and tx is invalidated
+ Assert.assertTrue(ds1.started);
+ Assert.assertTrue(ds2.started);
+ Assert.assertTrue(ds1.checked);
+ Assert.assertTrue(ds2.checked);
+ Assert.assertTrue(ds1.committed);
+ Assert.assertFalse(ds2.committed);
+ Assert.assertFalse(ds1.postCommitted);
+ Assert.assertFalse(ds2.postCommitted);
+ Assert.assertTrue(ds1.rolledBack);
+ Assert.assertTrue(ds2.rolledBack);
+ Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Invalidated);
+ }
+
+ @Test
+ public void testNoIndefiniteRetryByDefault() throws TransactionFailureException, InterruptedException {
+ // we want retry by default, so that engineers don't miss it
+ txClient.failCommits = 1000;
+ try {
+ // execute: add a change to ds1 and ds2
+ getExecutor().execute(testFunction, 10);
+ Assert.fail("commit failed too many times to retry - exception should be thrown");
+ } catch (TransactionConflictException e) {
+ Assert.assertNull(e.getCause());
+ }
+
+ txClient.failCommits = 0;
+
+ // verify both are rolled back and tx is aborted
+ Assert.assertTrue(ds1.started);
+ Assert.assertTrue(ds2.started);
+ Assert.assertTrue(ds1.checked);
+ Assert.assertTrue(ds2.checked);
+ Assert.assertTrue(ds1.committed);
+ Assert.assertTrue(ds2.committed);
+ Assert.assertFalse(ds1.postCommitted);
+ Assert.assertFalse(ds2.postCommitted);
+ Assert.assertTrue(ds1.rolledBack);
+ Assert.assertTrue(ds2.rolledBack);
+ Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted);
+ }
+
+ @Test
+ public void testRetryByDefault() throws TransactionFailureException, InterruptedException {
+ // we want retry by default, so that engineers don't miss it
+ txClient.failCommits = 2;
+ // execute: add a change to ds1 and ds2
+ getExecutor().execute(testFunction, 10);
+ // should not fail, but continue
+
+ // verify both are committed
+ Assert.assertTrue(ds1.started);
+ Assert.assertTrue(ds2.started);
+ Assert.assertTrue(ds1.checked);
+ Assert.assertTrue(ds2.checked);
+ Assert.assertTrue(ds1.committed);
+ Assert.assertTrue(ds2.committed);
+ Assert.assertTrue(ds1.postCommitted);
+ Assert.assertTrue(ds2.postCommitted);
+ Assert.assertFalse(ds1.rolledBack);
+ Assert.assertFalse(ds2.rolledBack);
+ Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Committed);
+ }
+
+ @Test
+ public void testCommitFalse() throws TransactionFailureException, InterruptedException {
+ txClient.failCommits = 1;
+ // execute: add a change to ds1 and ds2
+ try {
+ getExecutorWithNoRetry().execute(testFunction, 10);
+ Assert.fail("commit failed - exception should be thrown");
+ } catch (TransactionConflictException e) {
+ Assert.assertNull(e.getCause());
+ }
+ // verify both are rolled back and tx is aborted
+ Assert.assertTrue(ds1.started);
+ Assert.assertTrue(ds2.started);
+ Assert.assertTrue(ds1.checked);
+ Assert.assertTrue(ds2.checked);
+ Assert.assertTrue(ds1.committed);
+ Assert.assertTrue(ds2.committed);
+ Assert.assertFalse(ds1.postCommitted);
+ Assert.assertFalse(ds2.postCommitted);
+ Assert.assertTrue(ds1.rolledBack);
+ Assert.assertTrue(ds2.rolledBack);
+ Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted);
+ }
+
+ @Test
+ public void testCanCommitFalse() throws TransactionFailureException, InterruptedException {
+ txClient.failCanCommitOnce = true;
+ // execute: add a change to ds1 and ds2
+ try {
+ getExecutorWithNoRetry().execute(testFunction, 10);
+ Assert.fail("commit failed - exception should be thrown");
+ } catch (TransactionConflictException e) {
+ Assert.assertNull(e.getCause());
+ }
+ // verify both are rolled back and tx is aborted
+ Assert.assertTrue(ds1.started);
+ Assert.assertTrue(ds2.started);
+ Assert.assertTrue(ds1.checked);
+ Assert.assertTrue(ds2.checked);
+ Assert.assertFalse(ds1.committed);
+ Assert.assertFalse(ds2.committed);
+ Assert.assertFalse(ds1.postCommitted);
+ Assert.assertFalse(ds2.postCommitted);
+ Assert.assertTrue(ds1.rolledBack);
+ Assert.assertTrue(ds2.rolledBack);
+ Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted);
+ }
+
+ @Test
+ public void testChangesAndRollbackFailure() throws TransactionFailureException, InterruptedException {
+ ds1.failChangesTxOnce = InduceFailure.ThrowException;
+ ds1.failRollbackTxOnce = InduceFailure.ThrowException;
+ // execute: add a change to ds1 and ds2
+ try {
+ getExecutor().execute(testFunction, 10);
+ Assert.fail("get changes failed - exception should be thrown");
+ } catch (TransactionFailureException e) {
+ Assert.assertEquals("changes failure", e.getCause().getMessage());
+ }
+ // verify both are rolled back and tx is invalidated
+ Assert.assertTrue(ds1.started);
+ Assert.assertTrue(ds2.started);
+ Assert.assertTrue(ds1.checked);
+ Assert.assertFalse(ds2.checked);
+ Assert.assertFalse(ds1.committed);
+ Assert.assertFalse(ds2.committed);
+ Assert.assertFalse(ds1.postCommitted);
+ Assert.assertFalse(ds2.postCommitted);
+ Assert.assertTrue(ds1.rolledBack);
+ Assert.assertTrue(ds2.rolledBack);
+ Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Invalidated);
+ }
+
+ @Test
+ public void testFunctionAndRollbackFailure() throws TransactionFailureException, InterruptedException {
+ ds1.failRollbackTxOnce = InduceFailure.ReturnFalse;
+ // execute: add a change to ds1 and ds2
+ try {
+ getExecutor().execute(testFunction, null);
+ Assert.fail("function failed - exception should be thrown");
+ } catch (TransactionFailureException e) {
+ Assert.assertEquals("function failed", e.getCause().getMessage());
+ }
+ // verify both are rolled back and tx is invalidated
+ Assert.assertTrue(ds1.started);
+ Assert.assertTrue(ds2.started);
+ Assert.assertFalse(ds1.checked);
+ Assert.assertFalse(ds2.checked);
+ Assert.assertFalse(ds1.committed);
+ Assert.assertFalse(ds2.committed);
+ Assert.assertFalse(ds1.postCommitted);
+ Assert.assertFalse(ds2.postCommitted);
+ Assert.assertTrue(ds1.rolledBack);
+ Assert.assertTrue(ds2.rolledBack);
+ Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Invalidated);
+ }
+
+ @Test
+ public void testStartAndRollbackFailure() throws TransactionFailureException, InterruptedException {
+ ds1.failStartTxOnce = InduceFailure.ThrowException;
+ // execute: add a change to ds1 and ds2
+ try {
+ getExecutor().execute(testFunction, 10);
+ Assert.fail("start failed - exception should be thrown");
+ } catch (TransactionFailureException e) {
+ Assert.assertEquals("start failure", e.getCause().getMessage());
+ }
+ // verify both are not rolled back and tx is aborted
+ Assert.assertTrue(ds1.started);
+ Assert.assertFalse(ds2.started);
+ Assert.assertFalse(ds1.checked);
+ Assert.assertFalse(ds2.checked);
+ Assert.assertFalse(ds1.committed);
+ Assert.assertFalse(ds2.committed);
+ Assert.assertFalse(ds1.postCommitted);
+ Assert.assertFalse(ds2.postCommitted);
+ Assert.assertFalse(ds1.rolledBack);
+ Assert.assertFalse(ds2.rolledBack);
+ Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted);
+ }
+
+ enum InduceFailure { NoFailure, ReturnFalse, ThrowException }
+
+ static class DummyTxAware implements TransactionAware {
+
+ Transaction tx;
+ boolean started = false;
+ boolean committed = false;
+ boolean checked = false;
+ boolean rolledBack = false;
+ boolean postCommitted = false;
+ List<byte[]> changes = Lists.newArrayList();
+
+ InduceFailure failStartTxOnce = InduceFailure.NoFailure;
+ InduceFailure failChangesTxOnce = InduceFailure.NoFailure;
+ InduceFailure failCommitTxOnce = InduceFailure.NoFailure;
+ InduceFailure failPostCommitTxOnce = InduceFailure.NoFailure;
+ InduceFailure failRollbackTxOnce = InduceFailure.NoFailure;
+
+ void addChange(byte[] key) {
+ changes.add(key);
+ }
+
+ void reset() {
+ tx = null;
+ started = false;
+ checked = false;
+ committed = false;
+ rolledBack = false;
+ postCommitted = false;
+ changes.clear();
+ }
+
+ @Override
+ public void startTx(Transaction tx) {
+ reset();
+ started = true;
+ this.tx = tx;
+ if (failStartTxOnce == InduceFailure.ThrowException) {
+ failStartTxOnce = InduceFailure.NoFailure;
+ throw new RuntimeException("start failure");
+ }
+ }
+
+ @Override
+ public void updateTx(Transaction tx) {
+ this.tx = tx;
+ }
+
+ @Override
+ public Collection<byte[]> getTxChanges() {
+ checked = true;
+ if (failChangesTxOnce == InduceFailure.ThrowException) {
+ failChangesTxOnce = InduceFailure.NoFailure;
+ throw new RuntimeException("changes failure");
+ }
+ return ImmutableList.copyOf(changes);
+ }
+
+ @Override
+ public boolean commitTx() throws Exception {
+ committed = true;
+ if (failCommitTxOnce == InduceFailure.ThrowException) {
+ failCommitTxOnce = InduceFailure.NoFailure;
+ throw new RuntimeException("persist failure");
+ }
+ if (failCommitTxOnce == InduceFailure.ReturnFalse) {
+ failCommitTxOnce = InduceFailure.NoFailure;
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public void postTxCommit() {
+ postCommitted = true;
+ if (failPostCommitTxOnce == InduceFailure.ThrowException) {
+ failPostCommitTxOnce = InduceFailure.NoFailure;
+ throw new RuntimeException("post failure");
+ }
+ }
+
+ @Override
+ public boolean rollbackTx() throws Exception {
+ rolledBack = true;
+ if (failRollbackTxOnce == InduceFailure.ThrowException) {
+ failRollbackTxOnce = InduceFailure.NoFailure;
+ throw new RuntimeException("rollback failure");
+ }
+ if (failRollbackTxOnce == InduceFailure.ReturnFalse) {
+ failRollbackTxOnce = InduceFailure.NoFailure;
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public String getTransactionAwareName() {
+ return "dummy";
+ }
+ }
+
+ static class DummyTxClient extends InMemoryTxSystemClient {
+
+ boolean failCanCommitOnce = false;
+ int failCommits = 0;
+ enum CommitState {
+ Started, Committed, Aborted, Invalidated
+ }
+ CommitState state = CommitState.Started;
+
+ @Inject
+ DummyTxClient(TransactionManager txmgr) {
+ super(txmgr);
+ }
+
+ @Override
+ public boolean canCommit(Transaction tx, Collection<byte[]> changeIds) throws TransactionNotInProgressException {
+ if (failCanCommitOnce) {
+ failCanCommitOnce = false;
+ return false;
+ } else {
+ return super.canCommit(tx, changeIds);
+ }
+ }
+
+ @Override
+ public boolean commit(Transaction tx) throws TransactionNotInProgressException {
+ if (failCommits-- > 0) {
+ return false;
+ } else {
+ state = CommitState.Committed;
+ return super.commit(tx);
+ }
+ }
+
+ @Override
+ public Transaction startLong() {
+ state = CommitState.Started;
+ return super.startLong();
+ }
+
+ @Override
+ public Transaction startShort() {
+ state = CommitState.Started;
+ return super.startShort();
+ }
+
+ @Override
+ public Transaction startShort(int timeout) {
+ state = CommitState.Started;
+ return super.startShort(timeout);
+ }
+
+ @Override
+ public void abort(Transaction tx) {
+ state = CommitState.Aborted;
+ super.abort(tx);
+ }
+
+ @Override
+ public boolean invalidate(long tx) {
+ state = CommitState.Invalidated;
+ return super.invalidate(tx);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/TransactionManagerTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/TransactionManagerTest.java b/tephra-core/src/test/java/org/apache/tephra/TransactionManagerTest.java
new file mode 100644
index 0000000..f74e209
--- /dev/null
+++ b/tephra-core/src/test/java/org/apache/tephra/TransactionManagerTest.java
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tephra.inmemory.InMemoryTxSystemClient;
+import org.apache.tephra.metrics.TxMetricsCollector;
+import org.apache.tephra.persist.InMemoryTransactionStateStorage;
+import org.apache.tephra.persist.TransactionStateStorage;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ */
+public class TransactionManagerTest extends TransactionSystemTest {
+
+ static Configuration conf = new Configuration();
+
+ TransactionManager txManager = null;
+ TransactionStateStorage txStateStorage = null;
+
+ @Override
+ protected TransactionSystemClient getClient() {
+ return new InMemoryTxSystemClient(txManager);
+ }
+
+ @Override
+ protected TransactionStateStorage getStateStorage() {
+ return txStateStorage;
+ }
+
+ @Before
+ public void before() {
+ conf.setInt(TxConstants.Manager.CFG_TX_CLEANUP_INTERVAL, 0); // no cleanup thread
+ // todo should create two sets of tests, one with LocalFileTxStateStorage and one with InMemoryTxStateStorage
+ txStateStorage = new InMemoryTransactionStateStorage();
+ txManager = new TransactionManager
+ (conf, txStateStorage, new TxMetricsCollector());
+ txManager.startAndWait();
+ }
+
+ @After
+ public void after() {
+ txManager.stopAndWait();
+ }
+
+ @Test
+ public void testTransactionCleanup() throws Exception {
+ conf.setInt(TxConstants.Manager.CFG_TX_CLEANUP_INTERVAL, 3);
+ conf.setInt(TxConstants.Manager.CFG_TX_TIMEOUT, 2);
+ // using a new tx manager that cleans up
+ TransactionManager txm = new TransactionManager
+ (conf, new InMemoryTransactionStateStorage(), new TxMetricsCollector());
+ txm.startAndWait();
+ try {
+ Assert.assertEquals(0, txm.getInvalidSize());
+ Assert.assertEquals(0, txm.getCommittedSize());
+ // start a transaction and leave it open
+ Transaction tx1 = txm.startShort();
+ // start a long running transaction and leave it open
+ Transaction tx2 = txm.startLong();
+ Transaction tx3 = txm.startLong();
+ // start and commit a bunch of transactions
+ for (int i = 0; i < 10; i++) {
+ Transaction tx = txm.startShort();
+ Assert.assertTrue(txm.canCommit(tx, Collections.singleton(new byte[] { (byte) i })));
+ Assert.assertTrue(txm.commit(tx));
+ }
+ // all of these should still be in the committed set
+ Assert.assertEquals(0, txm.getInvalidSize());
+ Assert.assertEquals(10, txm.getCommittedSize());
+ // sleep longer than the cleanup interval
+ TimeUnit.SECONDS.sleep(5);
+ // transaction should now be invalid
+ Assert.assertEquals(1, txm.getInvalidSize());
+ // run another transaction
+ Transaction txx = txm.startShort();
+ // verify the exclude
+ Assert.assertFalse(txx.isVisible(tx1.getTransactionId()));
+ Assert.assertFalse(txx.isVisible(tx2.getTransactionId()));
+ Assert.assertFalse(txx.isVisible(tx3.getTransactionId()));
+ // try to commit the last transaction that was started
+ Assert.assertTrue(txm.canCommit(txx, Collections.singleton(new byte[] { 0x0a })));
+ Assert.assertTrue(txm.commit(txx));
+
+ // now the committed change sets should be empty again
+ Assert.assertEquals(0, txm.getCommittedSize());
+ // cannot commit transaction as it was timed out
+ try {
+ txm.canCommit(tx1, Collections.singleton(new byte[] { 0x11 }));
+ Assert.fail();
+ } catch (TransactionNotInProgressException e) {
+ // expected
+ }
+ txm.abort(tx1);
+ // abort should have removed from invalid
+ Assert.assertEquals(0, txm.getInvalidSize());
+ // run another bunch of transactions
+ for (int i = 0; i < 10; i++) {
+ Transaction tx = txm.startShort();
+ Assert.assertTrue(txm.canCommit(tx, Collections.singleton(new byte[] { (byte) i })));
+ Assert.assertTrue(txm.commit(tx));
+ }
+ // none of these should still be in the committed set (tx2 is long-running).
+ Assert.assertEquals(0, txm.getInvalidSize());
+ Assert.assertEquals(0, txm.getCommittedSize());
+ // commit tx2, abort tx3
+ Assert.assertTrue(txm.commit(tx2));
+ txm.abort(tx3);
+ // none of these should still be in the committed set (tx2 is long-running).
+ // Only tx3 is invalid list as it was aborted and is long-running. tx1 is short one and it rolled back its changes
+ // so it should NOT be in invalid list
+ Assert.assertEquals(1, txm.getInvalidSize());
+ Assert.assertEquals(tx3.getTransactionId(), (long) txm.getCurrentState().getInvalid().iterator().next());
+ Assert.assertEquals(1, txm.getExcludedListSize());
+ } finally {
+ txm.stopAndWait();
+ }
+ }
+
+ @Test
+ public void testLongTransactionCleanup() throws Exception {
+ conf.setInt(TxConstants.Manager.CFG_TX_CLEANUP_INTERVAL, 3);
+ conf.setInt(TxConstants.Manager.CFG_TX_LONG_TIMEOUT, 2);
+ // using a new tx manager that cleans up
+ TransactionManager txm = new TransactionManager
+ (conf, new InMemoryTransactionStateStorage(), new TxMetricsCollector());
+ txm.startAndWait();
+ try {
+ Assert.assertEquals(0, txm.getInvalidSize());
+ Assert.assertEquals(0, txm.getCommittedSize());
+
+ // start a long running transaction
+ Transaction tx1 = txm.startLong();
+
+ Assert.assertEquals(0, txm.getInvalidSize());
+ Assert.assertEquals(0, txm.getCommittedSize());
+
+ // sleep longer than the cleanup interval
+ TimeUnit.SECONDS.sleep(5);
+
+ // transaction should now be invalid
+ Assert.assertEquals(1, txm.getInvalidSize());
+ Assert.assertEquals(0, txm.getCommittedSize());
+
+ // cannot commit transaction as it was timed out
+ try {
+ txm.canCommit(tx1, Collections.singleton(new byte[] { 0x11 }));
+ Assert.fail();
+ } catch (TransactionNotInProgressException e) {
+ // expected
+ }
+
+ txm.abort(tx1);
+ // abort should not remove long running transaction from invalid list
+ Assert.assertEquals(1, txm.getInvalidSize());
+ } finally {
+ txm.stopAndWait();
+ }
+ }
+
+ @Test
+ public void testTruncateInvalid() throws Exception {
+ InMemoryTransactionStateStorage storage = new InMemoryTransactionStateStorage();
+ Configuration testConf = new Configuration(conf);
+ // No snapshots
+ testConf.setLong(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, -1);
+ TransactionManager txm1 = new TransactionManager(testConf, storage, new TxMetricsCollector());
+ txm1.startAndWait();
+
+ TransactionManager txm2 = null;
+ Transaction tx1;
+ Transaction tx2;
+ Transaction tx3;
+ Transaction tx4;
+ Transaction tx5;
+ Transaction tx6;
+ try {
+ Assert.assertEquals(0, txm1.getInvalidSize());
+
+ // start a few transactions
+ tx1 = txm1.startLong();
+ tx2 = txm1.startShort();
+ tx3 = txm1.startLong();
+ tx4 = txm1.startShort();
+ tx5 = txm1.startLong();
+ tx6 = txm1.startShort();
+
+ // invalidate tx1, tx2, tx5 and tx6
+ txm1.invalidate(tx1.getTransactionId());
+ txm1.invalidate(tx2.getTransactionId());
+ txm1.invalidate(tx5.getTransactionId());
+ txm1.invalidate(tx6.getTransactionId());
+
+ // tx1, tx2, tx5 and tx6 should be in invalid list
+ Assert.assertEquals(
+ ImmutableList.of(tx1.getTransactionId(), tx2.getTransactionId(), tx5.getTransactionId(),
+ tx6.getTransactionId()),
+ txm1.getCurrentState().getInvalid()
+ );
+
+ // remove tx1 and tx6 from invalid list
+ Assert.assertTrue(txm1.truncateInvalidTx(ImmutableSet.of(tx1.getTransactionId(), tx6.getTransactionId())));
+
+ // only tx2 and tx5 should be in invalid list now
+ Assert.assertEquals(ImmutableList.of(tx2.getTransactionId(), tx5.getTransactionId()),
+ txm1.getCurrentState().getInvalid());
+
+ // removing in-progress transactions should not have any effect
+ Assert.assertEquals(ImmutableSet.of(tx3.getTransactionId(), tx4.getTransactionId()),
+ txm1.getCurrentState().getInProgress().keySet());
+ Assert.assertFalse(txm1.truncateInvalidTx(ImmutableSet.of(tx3.getTransactionId(), tx4.getTransactionId())));
+ // no change to in-progress
+ Assert.assertEquals(ImmutableSet.of(tx3.getTransactionId(), tx4.getTransactionId()),
+ txm1.getCurrentState().getInProgress().keySet());
+ // no change to invalid list
+ Assert.assertEquals(ImmutableList.of(tx2.getTransactionId(), tx5.getTransactionId()),
+ txm1.getCurrentState().getInvalid());
+
+ // Test transaction edit logs replay
+ // Start another transaction manager without stopping txm1 so that snapshot does not get written,
+ // and all logs can be replayed.
+ txm2 = new TransactionManager(testConf, storage, new TxMetricsCollector());
+ txm2.startAndWait();
+ Assert.assertEquals(ImmutableList.of(tx2.getTransactionId(), tx5.getTransactionId()),
+ txm2.getCurrentState().getInvalid());
+ Assert.assertEquals(ImmutableSet.of(tx3.getTransactionId(), tx4.getTransactionId()),
+ txm2.getCurrentState().getInProgress().keySet());
+ } finally {
+ txm1.stopAndWait();
+ if (txm2 != null) {
+ txm2.stopAndWait();
+ }
+ }
+ }
+
+ @Test
+ public void testTruncateInvalidBeforeTime() throws Exception {
+ InMemoryTransactionStateStorage storage = new InMemoryTransactionStateStorage();
+ Configuration testConf = new Configuration(conf);
+ // No snapshots
+ testConf.setLong(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, -1);
+ TransactionManager txm1 = new TransactionManager(testConf, storage, new TxMetricsCollector());
+ txm1.startAndWait();
+
+ TransactionManager txm2 = null;
+ Transaction tx1;
+ Transaction tx2;
+ Transaction tx3;
+ Transaction tx4;
+ Transaction tx5;
+ Transaction tx6;
+ try {
+ Assert.assertEquals(0, txm1.getInvalidSize());
+
+ // start a few transactions
+ tx1 = txm1.startLong();
+ tx2 = txm1.startShort();
+ // Sleep so that transaction ids get generated a millisecond apart for assertion
+ // TEPHRA-63 should eliminate the need to sleep
+ TimeUnit.MILLISECONDS.sleep(1);
+ long timeBeforeTx3 = System.currentTimeMillis();
+ tx3 = txm1.startLong();
+ tx4 = txm1.startShort();
+ TimeUnit.MILLISECONDS.sleep(1);
+ long timeBeforeTx5 = System.currentTimeMillis();
+ tx5 = txm1.startLong();
+ tx6 = txm1.startShort();
+
+ // invalidate tx1, tx2, tx5 and tx6
+ txm1.invalidate(tx1.getTransactionId());
+ txm1.invalidate(tx2.getTransactionId());
+ txm1.invalidate(tx5.getTransactionId());
+ txm1.invalidate(tx6.getTransactionId());
+
+ // tx1, tx2, tx5 and tx6 should be in invalid list
+ Assert.assertEquals(
+ ImmutableList.of(tx1.getTransactionId(), tx2.getTransactionId(), tx5.getTransactionId(),
+ tx6.getTransactionId()),
+ txm1.getCurrentState().getInvalid()
+ );
+
+ // remove transactions before tx3 from invalid list
+ Assert.assertTrue(txm1.truncateInvalidTxBefore(timeBeforeTx3));
+
+ // only tx5 and tx6 should be in invalid list now
+ Assert.assertEquals(ImmutableList.of(tx5.getTransactionId(), tx6.getTransactionId()),
+ txm1.getCurrentState().getInvalid());
+
+ // removing invalid transactions before tx5 should throw exception since tx3 and tx4 are in-progress
+ Assert.assertEquals(ImmutableSet.of(tx3.getTransactionId(), tx4.getTransactionId()),
+ txm1.getCurrentState().getInProgress().keySet());
+ try {
+ txm1.truncateInvalidTxBefore(timeBeforeTx5);
+ Assert.fail("Expected InvalidTruncateTimeException exception");
+ } catch (InvalidTruncateTimeException e) {
+ // Expected exception
+ }
+ // no change to in-progress
+ Assert.assertEquals(ImmutableSet.of(tx3.getTransactionId(), tx4.getTransactionId()),
+ txm1.getCurrentState().getInProgress().keySet());
+ // no change to invalid list
+ Assert.assertEquals(ImmutableList.of(tx5.getTransactionId(), tx6.getTransactionId()),
+ txm1.getCurrentState().getInvalid());
+
+ // Test transaction edit logs replay
+ // Start another transaction manager without stopping txm1 so that snapshot does not get written,
+ // and all logs can be replayed.
+ txm2 = new TransactionManager(testConf, storage, new TxMetricsCollector());
+ txm2.startAndWait();
+ Assert.assertEquals(ImmutableList.of(tx5.getTransactionId(), tx6.getTransactionId()),
+ txm2.getCurrentState().getInvalid());
+ Assert.assertEquals(ImmutableSet.of(tx3.getTransactionId(), tx4.getTransactionId()),
+ txm2.getCurrentState().getInProgress().keySet());
+ } finally {
+ txm1.stopAndWait();
+ if (txm2 != null) {
+ txm2.stopAndWait();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/TransactionServiceMainTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/TransactionServiceMainTest.java b/tephra-core/src/test/java/org/apache/tephra/TransactionServiceMainTest.java
new file mode 100644
index 0000000..fa12acb
--- /dev/null
+++ b/tephra-core/src/test/java/org/apache/tephra/TransactionServiceMainTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra;
+
+import com.google.common.base.Throwables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tephra.distributed.TransactionServiceClient;
+import org.apache.twill.internal.zookeeper.InMemoryZKServer;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * Test for verifying TransactionServiceMain works correctly.
+ */
+public class TransactionServiceMainTest {
+
+ @ClassRule
+ public static TemporaryFolder tmpFolder = new TemporaryFolder();
+
+ @Test
+ public void testClientServer() throws Exception {
+ // Simply start a transaction server and connect to it with the client.
+ InMemoryZKServer zkServer = InMemoryZKServer.builder().setDataDir(tmpFolder.newFolder()).build();
+ zkServer.startAndWait();
+
+ try {
+ Configuration conf = new Configuration();
+ conf.set(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM, zkServer.getConnectionStr());
+ conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath());
+
+ final TransactionServiceMain main = new TransactionServiceMain(conf);
+ final CountDownLatch latch = new CountDownLatch(1);
+ Thread t = new Thread() {
+ @Override
+ public void run() {
+ try {
+ main.start();
+ latch.countDown();
+ } catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ }
+ };
+
+ try {
+ t.start();
+ // Wait for service to startup
+ latch.await();
+ TransactionServiceClient.doMain(true, conf);
+ } finally {
+ main.stop();
+ t.join();
+ }
+ } finally {
+ zkServer.stopAndWait();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/TransactionSystemTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/TransactionSystemTest.java b/tephra-core/src/test/java/org/apache/tephra/TransactionSystemTest.java
new file mode 100644
index 0000000..797c08a
--- /dev/null
+++ b/tephra-core/src/test/java/org/apache/tephra/TransactionSystemTest.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.tephra.persist.TransactionSnapshot;
+import org.apache.tephra.persist.TransactionStateStorage;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ */
+public abstract class TransactionSystemTest {
+
+ public static final byte[] C1 = new byte[] { 'c', '1' };
+ public static final byte[] C2 = new byte[] { 'c', '2' };
+ public static final byte[] C3 = new byte[] { 'c', '3' };
+ public static final byte[] C4 = new byte[] { 'c', '4' };
+
+ protected abstract TransactionSystemClient getClient() throws Exception;
+
+ protected abstract TransactionStateStorage getStateStorage() throws Exception;
+
+ @Test
+ public void testCommitRaceHandling() throws Exception {
+ TransactionSystemClient client1 = getClient();
+ TransactionSystemClient client2 = getClient();
+
+ Transaction tx1 = client1.startShort();
+ Transaction tx2 = client2.startShort();
+
+ Assert.assertTrue(client1.canCommit(tx1, asList(C1, C2)));
+ // second one also can commit even thought there are conflicts with first since first one hasn't committed yet
+ Assert.assertTrue(client2.canCommit(tx2, asList(C2, C3)));
+
+ Assert.assertTrue(client1.commit(tx1));
+
+ // now second one should not commit, since there are conflicts with tx1 that has been committed
+ Assert.assertFalse(client2.commit(tx2));
+ }
+
+ @Test
+ public void testMultipleCommitsAtSameTime() throws Exception {
+ // We want to check that if two txs finish at same time (wrt tx manager) they do not overwrite changesets of each
+ // other in tx manager used for conflicts detection (we had this bug)
+ // NOTE: you don't have to use multiple clients for that
+ TransactionSystemClient client1 = getClient();
+ TransactionSystemClient client2 = getClient();
+ TransactionSystemClient client3 = getClient();
+ TransactionSystemClient client4 = getClient();
+ TransactionSystemClient client5 = getClient();
+
+ Transaction tx1 = client1.startShort();
+ Transaction tx2 = client2.startShort();
+ Transaction tx3 = client3.startShort();
+ Transaction tx4 = client4.startShort();
+ Transaction tx5 = client5.startShort();
+
+ Assert.assertTrue(client1.canCommit(tx1, asList(C1)));
+ Assert.assertTrue(client1.commit(tx1));
+
+ Assert.assertTrue(client2.canCommit(tx2, asList(C2)));
+ Assert.assertTrue(client2.commit(tx2));
+
+ // verifying conflicts detection
+ Assert.assertFalse(client3.canCommit(tx3, asList(C1)));
+ Assert.assertFalse(client4.canCommit(tx4, asList(C2)));
+ Assert.assertTrue(client5.canCommit(tx5, asList(C3)));
+ }
+
+ @Test
+ public void testCommitTwice() throws Exception {
+ TransactionSystemClient client = getClient();
+ Transaction tx = client.startShort();
+
+ Assert.assertTrue(client.canCommit(tx, asList(C1, C2)));
+ Assert.assertTrue(client.commit(tx));
+ // cannot commit twice same tx
+ try {
+ Assert.assertFalse(client.commit(tx));
+ Assert.fail();
+ } catch (TransactionNotInProgressException e) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testAbortTwice() throws Exception {
+ TransactionSystemClient client = getClient();
+ Transaction tx = client.startShort();
+
+ Assert.assertTrue(client.canCommit(tx, asList(C1, C2)));
+ client.abort(tx);
+ // abort of not active tx has no affect
+ client.abort(tx);
+ }
+
+ @Test
+ public void testReuseTx() throws Exception {
+ TransactionSystemClient client = getClient();
+ Transaction tx = client.startShort();
+
+ Assert.assertTrue(client.canCommit(tx, asList(C1, C2)));
+ Assert.assertTrue(client.commit(tx));
+ // can't re-use same tx again
+ try {
+ client.canCommit(tx, asList(C3, C4));
+ Assert.fail();
+ } catch (TransactionNotInProgressException e) {
+ // expected
+ }
+ try {
+ Assert.assertFalse(client.commit(tx));
+ Assert.fail();
+ } catch (TransactionNotInProgressException e) {
+ // expected
+ }
+
+ // abort of not active tx has no affect
+ client.abort(tx);
+ }
+
+ @Test
+ public void testUseNotStarted() throws Exception {
+ TransactionSystemClient client = getClient();
+ Transaction tx1 = client.startShort();
+ Assert.assertTrue(client.commit(tx1));
+
+ // we know this is one is older than current writePointer and was not used
+ Transaction txOld = new Transaction(tx1.getReadPointer(), tx1.getTransactionId() - 1,
+ new long[] {}, new long[] {}, Transaction.NO_TX_IN_PROGRESS,
+ TransactionType.SHORT);
+ try {
+ Assert.assertFalse(client.canCommit(txOld, asList(C3, C4)));
+ Assert.fail();
+ } catch (TransactionNotInProgressException e) {
+ // expected
+ }
+ try {
+ Assert.assertFalse(client.commit(txOld));
+ Assert.fail();
+ } catch (TransactionNotInProgressException e) {
+ // expected
+ }
+ // abort of not active tx has no affect
+ client.abort(txOld);
+
+ // we know this is one is newer than current readPointer and was not used
+ Transaction txNew = new Transaction(tx1.getReadPointer(), tx1.getTransactionId() + 1,
+ new long[] {}, new long[] {}, Transaction.NO_TX_IN_PROGRESS,
+ TransactionType.SHORT);
+ try {
+ Assert.assertFalse(client.canCommit(txNew, asList(C3, C4)));
+ Assert.fail();
+ } catch (TransactionNotInProgressException e) {
+ // expected
+ }
+ try {
+ Assert.assertFalse(client.commit(txNew));
+ Assert.fail();
+ } catch (TransactionNotInProgressException e) {
+ // expected
+ }
+ // abort of not active tx has no affect
+ client.abort(txNew);
+ }
+
+ @Test
+ public void testAbortAfterCommit() throws Exception {
+ TransactionSystemClient client = getClient();
+ Transaction tx = client.startShort();
+
+ Assert.assertTrue(client.canCommit(tx, asList(C1, C2)));
+ Assert.assertTrue(client.commit(tx));
+ // abort of not active tx has no affect
+ client.abort(tx);
+ }
+
+ // todo add test invalidate method
+ @Test
+ public void testInvalidateTx() throws Exception {
+ TransactionSystemClient client = getClient();
+ // Invalidate an in-progress tx
+ Transaction tx1 = client.startShort();
+ client.canCommit(tx1, asList(C1, C2));
+ Assert.assertTrue(client.invalidate(tx1.getTransactionId()));
+ // Cannot invalidate a committed tx
+ Transaction tx2 = client.startShort();
+ client.canCommit(tx2, asList(C3, C4));
+ client.commit(tx2);
+ Assert.assertFalse(client.invalidate(tx2.getTransactionId()));
+ }
+
+ @Test
+ public void testResetState() throws Exception {
+ // have tx in progress, committing and committed then reset,
+ // get the last snapshot and see that it is empty
+ TransactionSystemClient client = getClient();
+ TransactionStateStorage stateStorage = getStateStorage();
+
+ Transaction tx1 = client.startShort();
+ Transaction tx2 = client.startShort();
+ client.canCommit(tx1, asList(C1, C2));
+ client.commit(tx1);
+ client.canCommit(tx2, asList(C3, C4));
+
+ Transaction txPreReset = client.startShort();
+ long currentTs = System.currentTimeMillis();
+ client.resetState();
+
+ TransactionSnapshot snapshot = stateStorage.getLatestSnapshot();
+ Assert.assertTrue(snapshot.getTimestamp() >= currentTs);
+ Assert.assertEquals(0, snapshot.getInvalid().size());
+ Assert.assertEquals(0, snapshot.getInProgress().size());
+ Assert.assertEquals(0, snapshot.getCommittingChangeSets().size());
+ Assert.assertEquals(0, snapshot.getCommittedChangeSets().size());
+
+ // confirm that transaction IDs are not reset
+ Transaction txPostReset = client.startShort();
+ Assert.assertTrue("New tx ID should be greater than last ID before reset",
+ txPostReset.getTransactionId() > txPreReset.getTransactionId());
+ }
+
+ @Test
+ public void testTruncateInvalidTx() throws Exception {
+ // Start few transactions and invalidate all of them
+ TransactionSystemClient client = getClient();
+ Transaction tx1 = client.startLong();
+ Transaction tx2 = client.startShort();
+ Transaction tx3 = client.startLong();
+
+ client.invalidate(tx1.getTransactionId());
+ client.invalidate(tx2.getTransactionId());
+ client.invalidate(tx3.getTransactionId());
+
+ // Remove tx2 and tx3 from invalid list
+ Assert.assertTrue(client.truncateInvalidTx(ImmutableSet.of(tx2.getTransactionId(), tx3.getTransactionId())));
+
+ Transaction tx = client.startShort();
+ // Only tx1 should be in invalid list now
+ Assert.assertArrayEquals(new long[] {tx1.getTransactionId()}, tx.getInvalids());
+ client.abort(tx);
+ }
+
+ @Test
+ public void testTruncateInvalidTxBefore() throws Exception {
+ // Start few transactions
+ TransactionSystemClient client = getClient();
+ Transaction tx1 = client.startLong();
+ Transaction tx2 = client.startShort();
+ // Sleep so that transaction ids get generated a millisecond apart for assertion
+ // TEPHRA-63 should eliminate the need to sleep
+ TimeUnit.MILLISECONDS.sleep(1);
+ long beforeTx3 = System.currentTimeMillis();
+ Transaction tx3 = client.startLong();
+
+ try {
+ // throws exception since tx1 and tx2 are still in-progress
+ client.truncateInvalidTxBefore(beforeTx3);
+ Assert.fail("Expected InvalidTruncateTimeException exception");
+ } catch (InvalidTruncateTimeException e) {
+ // Expected exception
+ }
+
+ // Invalidate all of them
+ client.invalidate(tx1.getTransactionId());
+ client.invalidate(tx2.getTransactionId());
+ client.invalidate(tx3.getTransactionId());
+
+ // Remove transactions before time beforeTx3
+ Assert.assertTrue(client.truncateInvalidTxBefore(beforeTx3));
+
+ Transaction tx = client.startShort();
+ // Only tx3 should be in invalid list now
+ Assert.assertArrayEquals(new long[] {tx3.getTransactionId()}, tx.getInvalids());
+ client.abort(tx);
+ }
+
+ @Test
+ public void testGetInvalidSize() throws Exception {
+ // Start few transactions and invalidate all of them
+ TransactionSystemClient client = getClient();
+ Transaction tx1 = client.startLong();
+ Transaction tx2 = client.startShort();
+ Transaction tx3 = client.startLong();
+
+ Assert.assertEquals(0, client.getInvalidSize());
+
+ client.invalidate(tx1.getTransactionId());
+ client.invalidate(tx2.getTransactionId());
+ client.invalidate(tx3.getTransactionId());
+
+ Assert.assertEquals(3, client.getInvalidSize());
+ }
+
+ private Collection<byte[]> asList(byte[]... val) {
+ return Arrays.asList(val);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/TransactionTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/TransactionTest.java b/tephra-core/src/test/java/org/apache/tephra/TransactionTest.java
new file mode 100644
index 0000000..08961f4
--- /dev/null
+++ b/tephra-core/src/test/java/org/apache/tephra/TransactionTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.primitives.Longs;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Set;
+
+/**
+ *
+ */
+public class TransactionTest {
+ // Current transaction id
+ private final long txId = 200L;
+ // Read pointer for current transaction
+ private final long readPointer = 100L;
+ // Transactions committed before current transaction was started.
+ private final Set<Long> priorCommitted = ImmutableSet.of(80L, 99L, 100L);
+ // Transactions committed after current transaction was started.
+ private final Set<Long> postCommitted = ImmutableSet.of(150L, 180L, 210L);
+ // Invalid transactions before current transaction was started.
+ private final Set<Long> priorInvalids = ImmutableSet.of(90L, 110L, 190L);
+ // Invalid transactions after current transaction was started.
+ private final Set<Long> postInvalids = ImmutableSet.of(201L, 221L, 231L);
+ // Transactions in progress before current transaction was started.
+ private final Set<Long> priorInProgress = ImmutableSet.of(95L, 120L, 150L);
+ // Transactions in progress after current transaction was started.
+ private final Set<Long> postInProgress = ImmutableSet.of(205L, 215L, 300L);
+
+ @Test
+ public void testSnapshotVisibility() throws Exception {
+ Transaction.VisibilityLevel visibilityLevel = Transaction.VisibilityLevel.SNAPSHOT;
+
+ Set<Long> checkPointers = ImmutableSet.of(220L, 250L);
+ Transaction tx = new Transaction(readPointer, txId, 250L, toSortedArray(priorInvalids),
+ toSortedArray(priorInProgress), 95L,
+ TransactionType.SHORT, toSortedArray(checkPointers),
+ visibilityLevel);
+ Set<Long> visibleCurrent = ImmutableSet.of(200L, 220L, 250L);
+ Set<Long> notVisibleCurrent = ImmutableSet.of();
+ assertVisibility(priorCommitted, postCommitted, priorInvalids, postInvalids, priorInProgress, postInProgress,
+ visibleCurrent, notVisibleCurrent, tx);
+
+ checkPointers = ImmutableSet.of();
+ tx = new Transaction(readPointer, txId, txId, toSortedArray(priorInvalids), toSortedArray(priorInProgress), 95L,
+ TransactionType.SHORT, toSortedArray(checkPointers),
+ visibilityLevel);
+ visibleCurrent = ImmutableSet.of(txId);
+ notVisibleCurrent = ImmutableSet.of();
+ assertVisibility(priorCommitted, postCommitted, priorInvalids, postInvalids, priorInProgress, postInProgress,
+ visibleCurrent, notVisibleCurrent, tx);
+ }
+
+ @Test
+ public void testSnapshotExcludeVisibility() throws Exception {
+ Transaction.VisibilityLevel visibilityLevel = Transaction.VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT;
+
+ Set<Long> checkPointers = ImmutableSet.of(220L, 250L);
+ Transaction tx = new Transaction(readPointer, txId, 250L, toSortedArray(priorInvalids),
+ toSortedArray(priorInProgress), 95L,
+ TransactionType.SHORT, toSortedArray(checkPointers),
+ visibilityLevel);
+ Set<Long> visibleCurrent = ImmutableSet.of(200L, 220L);
+ Set<Long> notVisibleCurrent = ImmutableSet.of(250L);
+ assertVisibility(priorCommitted, postCommitted, priorInvalids, postInvalids, priorInProgress, postInProgress,
+ visibleCurrent, notVisibleCurrent, tx);
+
+ checkPointers = ImmutableSet.of();
+ tx = new Transaction(readPointer, txId, txId, toSortedArray(priorInvalids), toSortedArray(priorInProgress), 95L,
+ TransactionType.SHORT, toSortedArray(checkPointers),
+ visibilityLevel);
+ visibleCurrent = ImmutableSet.of();
+ notVisibleCurrent = ImmutableSet.of(txId);
+ assertVisibility(priorCommitted, postCommitted, priorInvalids, postInvalids, priorInProgress, postInProgress,
+ visibleCurrent, notVisibleCurrent, tx);
+ }
+
+ @Test
+ public void testSnapshotAllVisibility() throws Exception {
+ Transaction.VisibilityLevel visibilityLevel = Transaction.VisibilityLevel.SNAPSHOT_ALL;
+
+ Set<Long> checkPointers = ImmutableSet.of(220L, 250L);
+ Transaction tx = new Transaction(readPointer, txId, 250L, toSortedArray(priorInvalids),
+ toSortedArray(priorInProgress), 95L,
+ TransactionType.SHORT, toSortedArray(checkPointers),
+ visibilityLevel);
+ Set<Long> visibleCurrent = ImmutableSet.of(200L, 220L, 250L);
+ Set<Long> notVisibleCurrent = ImmutableSet.of();
+ assertVisibility(priorCommitted, postCommitted, priorInvalids, postInvalids, priorInProgress, postInProgress,
+ visibleCurrent, notVisibleCurrent, tx);
+
+ checkPointers = ImmutableSet.of();
+ tx = new Transaction(readPointer, txId, txId, toSortedArray(priorInvalids),
+ toSortedArray(priorInProgress), 95L,
+ TransactionType.SHORT, toSortedArray(checkPointers),
+ visibilityLevel);
+ visibleCurrent = ImmutableSet.of(txId);
+ notVisibleCurrent = ImmutableSet.of();
+ assertVisibility(priorCommitted, postCommitted, priorInvalids, postInvalids, priorInProgress, postInProgress,
+ visibleCurrent, notVisibleCurrent, tx);
+ }
+
+ private void assertVisibility(Set<Long> priorCommitted, Set<Long> postCommitted, Set<Long> priorInvalids,
+ Set<Long> postInvalids, Set<Long> priorInProgress, Set<Long> postInProgress,
+ Set<Long> visibleCurrent, Set<Long> notVisibleCurrent,
+ Transaction tx) {
+ // Verify visible snapshots of tx are visible
+ for (long t : visibleCurrent) {
+ Assert.assertTrue("Assertion error for version = " + t, tx.isVisible(t));
+ }
+
+ // Verify not visible snapshots of tx are not visible
+ for (long t : notVisibleCurrent) {
+ Assert.assertFalse("Assertion error for version = " + t, tx.isVisible(t));
+ }
+
+ // Verify prior committed versions are visible
+ for (long t : priorCommitted) {
+ Assert.assertTrue("Assertion error for version = " + t, tx.isVisible(t));
+ }
+
+ // Verify versions committed after tx started, and not part of tx are not visible
+ for (long t : postCommitted) {
+ Assert.assertFalse("Assertion error for version = " + t, tx.isVisible(t));
+ }
+
+ // Verify invalid and in-progress versions are not visible
+ for (long t : Iterables.concat(priorInvalids, postInvalids, priorInProgress, postInProgress)) {
+ Assert.assertFalse("Assertion error for version = " + t, tx.isVisible(t));
+ }
+ }
+
+ private long[] toSortedArray(Set<Long> set) {
+ long[] array = Longs.toArray(set);
+ Arrays.sort(array);
+ return array;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/distributed/ElasticPoolTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/distributed/ElasticPoolTest.java b/tephra-core/src/test/java/org/apache/tephra/distributed/ElasticPoolTest.java
new file mode 100644
index 0000000..6a30280
--- /dev/null
+++ b/tephra-core/src/test/java/org/apache/tephra/distributed/ElasticPoolTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.distributed;
+
+import com.google.common.base.Throwables;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Tests for {@link ElasticPool}.
+ */
+public class ElasticPoolTest {
+
+ static class Dummy {
+ static AtomicInteger count = new AtomicInteger(0);
+ boolean valid = true;
+ Dummy() {
+ count.incrementAndGet();
+ }
+ void markInvalid() {
+ valid = false;
+ }
+
+ public boolean isValid() {
+ return valid;
+ }
+ }
+
+ class DummyPool extends ElasticPool<Dummy, RuntimeException> {
+
+ public DummyPool(int sizeLimit) {
+ super(sizeLimit);
+ }
+
+ @Override
+ protected Dummy create() {
+ return new Dummy();
+ }
+
+ @Override
+ protected boolean recycle(Dummy element) {
+ return element.isValid();
+ }
+ }
+
+ @Test(timeout = 5000)
+ public void testFewerThreadsThanElements() throws InterruptedException {
+ final DummyPool pool = new DummyPool(5);
+ Dummy.count.set(0);
+ createAndRunThreads(2, pool, false);
+ // we only ran 2 threads, so only 2 elements got created, even though pool size is 5
+ Assert.assertEquals(2, Dummy.count.get());
+ }
+
+ @Test(timeout = 5000)
+ public void testMoreThreadsThanElements() throws InterruptedException {
+ final DummyPool pool = new DummyPool(2);
+ Dummy.count.set(0);
+ createAndRunThreads(5, pool, false);
+ // even though we ran 5 threads, only 2 elements got created because pool size is 2
+ Assert.assertEquals(2, Dummy.count.get());
+ }
+
+ @Test(timeout = 5000)
+ public void testMoreThreadsThanElementsWithDiscard() throws InterruptedException {
+ final DummyPool pool = new DummyPool(2);
+ Dummy.count.set(0);
+ int numThreads = 3;
+ // pass 'true' as the last parameter, which results in the elements being discarded after each obtain() call.
+ createAndRunThreads(numThreads, pool, true);
+ // this results in (5 * numThreads) number of elements being created since each thread obtains a client 5 times.
+ Assert.assertEquals(5 * numThreads, Dummy.count.get());
+ }
+
+ // Creates a list of threads which obtain a client from the pool, sleeps for a certain amount of time, and then
+ // releases the client back to the pool, optionally marking it invalid before doing so. It repeats this five times.
+ // Then, runs these threads to completion.
+ private void createAndRunThreads(int numThreads, final DummyPool pool,
+ final boolean discardAtEnd) throws InterruptedException {
+ Thread[] threads = new Thread[numThreads];
+ for (int i = 0; i < numThreads; i++) {
+ threads[i] = new Thread() {
+ @Override
+ public void run() {
+ for (int j = 0; j < 5; ++j) {
+ Dummy dummy;
+ try {
+ dummy = pool.obtain();
+ } catch (InterruptedException e) {
+ throw Throwables.propagate(e);
+ }
+ try {
+ Thread.sleep(10L);
+ } catch (InterruptedException e) {
+ // ignored
+ }
+ if (discardAtEnd) {
+ dummy.markInvalid();
+ }
+ pool.release(dummy);
+ }
+ }
+ };
+ }
+ for (Thread t : threads) {
+ t.start();
+ }
+ for (Thread t : threads) {
+ t.join();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/distributed/PooledClientProviderTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/distributed/PooledClientProviderTest.java b/tephra-core/src/test/java/org/apache/tephra/distributed/PooledClientProviderTest.java
new file mode 100644
index 0000000..90a69e9
--- /dev/null
+++ b/tephra-core/src/test/java/org/apache/tephra/distributed/PooledClientProviderTest.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.distributed;
+
+import com.google.common.base.Throwables;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tephra.TransactionServiceMain;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.runtime.ConfigModule;
+import org.apache.tephra.runtime.DiscoveryModules;
+import org.apache.tephra.runtime.TransactionClientModule;
+import org.apache.tephra.runtime.TransactionModules;
+import org.apache.tephra.runtime.ZKModule;
+import org.apache.twill.discovery.DiscoveryServiceClient;
+import org.apache.twill.internal.zookeeper.InMemoryZKServer;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeoutException;
+
+public class PooledClientProviderTest {
+
+ public static final int MAX_CLIENT_COUNT = 3;
+ public static final long CLIENT_OBTAIN_TIMEOUT = 10;
+
+ @ClassRule
+ public static TemporaryFolder tmpFolder = new TemporaryFolder();
+
+ @Test
+ public void testClientConnectionPoolMaximumNumberOfClients() throws Exception {
+ // We need a server for the client to connect to
+ InMemoryZKServer zkServer = InMemoryZKServer.builder().setDataDir(tmpFolder.newFolder()).build();
+ zkServer.startAndWait();
+
+ try {
+ Configuration conf = new Configuration();
+ conf.set(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM, zkServer.getConnectionStr());
+ conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath());
+ conf.set("data.tx.client.count", Integer.toString(MAX_CLIENT_COUNT));
+ conf.set("data.tx.client.obtain.timeout", Long.toString(CLIENT_OBTAIN_TIMEOUT));
+
+ final TransactionServiceMain main = new TransactionServiceMain(conf);
+ final CountDownLatch latch = new CountDownLatch(1);
+ Thread t = new Thread() {
+ @Override
+ public void run() {
+ try {
+ main.start();
+ latch.countDown();
+ } catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ }
+ };
+
+ try {
+ t.start();
+ // Wait for service to startup
+ latch.await();
+
+ startClientAndTestPool(conf);
+ } finally {
+ main.stop();
+ t.join();
+ }
+ } finally {
+ zkServer.stopAndWait();
+ }
+ }
+
+ private void startClientAndTestPool(Configuration conf) throws Exception {
+ Injector injector = Guice.createInjector(
+ new ConfigModule(conf),
+ new ZKModule(),
+ new DiscoveryModules().getDistributedModules(),
+ new TransactionModules().getDistributedModules(),
+ new TransactionClientModule()
+ );
+
+ ZKClientService zkClient = injector.getInstance(ZKClientService.class);
+ zkClient.startAndWait();
+
+ final PooledClientProvider clientProvider = new PooledClientProvider(conf,
+ injector.getInstance(DiscoveryServiceClient.class));
+
+ // test simple case of get + return. Note: this also initializes the provider's pool, which
+ // takes about one second (discovery). Doing it before we test the threads makes it so that one
+ // thread doesn't take exceptionally longer than the others.
+ try (CloseableThriftClient closeableThriftClient = clientProvider.getCloseableClient()) {
+ // do nothing with the client
+ }
+
+ //Now race to get MAX_CLIENT_COUNT+1 clients, exhausting the pool and requesting 1 more.
+ List<Future<Integer>> clientIds = new ArrayList<Future<Integer>>();
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ ExecutorService executor = Executors.newFixedThreadPool(MAX_CLIENT_COUNT + 1);
+ for (int i = 0; i < MAX_CLIENT_COUNT + 1; i++) {
+ clientIds.add(executor.submit(new RetrieveClient(clientProvider, CLIENT_OBTAIN_TIMEOUT / 2, countDownLatch)));
+ }
+ countDownLatch.countDown();
+
+ Set<Integer> ids = new HashSet<Integer>();
+ for (Future<Integer> id : clientIds) {
+ ids.add(id.get());
+ }
+ Assert.assertEquals(MAX_CLIENT_COUNT, ids.size());
+
+ // now, try it again with, where each thread holds onto the client for twice the client.obtain.timeout value.
+ // one of the threads should throw a TimeOutException, because the other threads don't release their clients
+ // within the configured timeout.
+ countDownLatch = new CountDownLatch(1);
+ for (int i = 0; i < MAX_CLIENT_COUNT + 1; i++) {
+ clientIds.add(executor.submit(new RetrieveClient(clientProvider, CLIENT_OBTAIN_TIMEOUT * 2, countDownLatch)));
+ }
+ countDownLatch.countDown();
+ int numTimeoutExceptions = 0;
+ for (Future<Integer> clientId : clientIds) {
+ try {
+ clientId.get();
+ } catch (ExecutionException expected) {
+ Assert.assertEquals(TimeoutException.class, expected.getCause().getClass());
+ numTimeoutExceptions++;
+ }
+ }
+ // expect that exactly one of the threads hit the TimeoutException
+ Assert.assertEquals(String.format("Expected one thread to not obtain a client within %s milliseconds.",
+ CLIENT_OBTAIN_TIMEOUT),
+ 1, numTimeoutExceptions);
+
+ executor.shutdown();
+ }
+
+ private static class RetrieveClient implements Callable<Integer> {
+ private final PooledClientProvider pool;
+ private final long holdClientMs;
+ private final CountDownLatch begin;
+
+ public RetrieveClient(PooledClientProvider pool, long holdClientMs,
+ CountDownLatch begin) {
+ this.pool = pool;
+ this.holdClientMs = holdClientMs;
+ this.begin = begin;
+ }
+
+ @Override
+ public Integer call() throws Exception {
+ begin.await();
+ try (CloseableThriftClient client = pool.getCloseableClient()) {
+ int id = System.identityHashCode(client.getThriftClient());
+ // "use" the client for a configured amount of milliseconds
+ Thread.sleep(holdClientMs);
+ return id;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/distributed/ThriftTransactionServerTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/distributed/ThriftTransactionServerTest.java b/tephra-core/src/test/java/org/apache/tephra/distributed/ThriftTransactionServerTest.java
new file mode 100644
index 0000000..a930720
--- /dev/null
+++ b/tephra-core/src/test/java/org/apache/tephra/distributed/ThriftTransactionServerTest.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.distributed;
+
+import com.google.common.util.concurrent.Service;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Scopes;
+import com.google.inject.util.Modules;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tephra.ThriftTransactionSystemTest;
+import org.apache.tephra.TransactionSystemClient;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.persist.InMemoryTransactionStateStorage;
+import org.apache.tephra.persist.TransactionEdit;
+import org.apache.tephra.persist.TransactionLog;
+import org.apache.tephra.persist.TransactionStateStorage;
+import org.apache.tephra.runtime.ConfigModule;
+import org.apache.tephra.runtime.DiscoveryModules;
+import org.apache.tephra.runtime.TransactionClientModule;
+import org.apache.tephra.runtime.TransactionModules;
+import org.apache.tephra.runtime.ZKModule;
+import org.apache.twill.internal.zookeeper.InMemoryZKServer;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This tests whether transaction service hangs on stop when heavily loaded - https://issues.cask.co/browse/TEPHRA-132
+ */
+public class ThriftTransactionServerTest {
+ private static final Logger LOG = LoggerFactory.getLogger(ThriftTransactionSystemTest.class);
+
+ private static InMemoryZKServer zkServer;
+ private static ZKClientService zkClientService;
+ private static TransactionService txService;
+ private static TransactionStateStorage storage;
+ static Injector injector;
+
+ private static final int NUM_CLIENTS = 17;
+ private static final CountDownLatch STORAGE_WAIT_LATCH = new CountDownLatch(1);
+ private static final CountDownLatch CLIENTS_DONE_LATCH = new CountDownLatch(NUM_CLIENTS);
+
+ @ClassRule
+ public static TemporaryFolder tmpFolder = new TemporaryFolder();
+
+ @BeforeClass
+ public static void start() throws Exception {
+ zkServer = InMemoryZKServer.builder().setDataDir(tmpFolder.newFolder()).build();
+ zkServer.startAndWait();
+
+ Configuration conf = new Configuration();
+ conf.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false);
+ conf.set(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM, zkServer.getConnectionStr());
+ conf.set(TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, "n-times");
+ conf.setInt(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, 1);
+ conf.setInt(TxConstants.Service.CFG_DATA_TX_CLIENT_COUNT, NUM_CLIENTS);
+ conf.setLong(TxConstants.Service.CFG_DATA_TX_CLIENT_TIMEOUT, TimeUnit.HOURS.toMillis(1));
+ conf.setInt(TxConstants.Service.CFG_DATA_TX_SERVER_IO_THREADS, 2);
+ conf.setInt(TxConstants.Service.CFG_DATA_TX_SERVER_THREADS, 4);
+
+ injector = Guice.createInjector(
+ new ConfigModule(conf),
+ new ZKModule(),
+ new DiscoveryModules().getDistributedModules(),
+ Modules.override(new TransactionModules().getDistributedModules())
+ .with(new AbstractModule() {
+ @Override
+ protected void configure() {
+ bind(TransactionStateStorage.class).to(SlowTransactionStorage.class).in(Scopes.SINGLETON);
+ }
+ }),
+ new TransactionClientModule()
+ );
+
+ zkClientService = injector.getInstance(ZKClientService.class);
+ zkClientService.startAndWait();
+
+ // start a tx server
+ txService = injector.getInstance(TransactionService.class);
+ storage = injector.getInstance(TransactionStateStorage.class);
+ try {
+ LOG.info("Starting transaction service");
+ txService.startAndWait();
+ } catch (Exception e) {
+ LOG.error("Failed to start service: ", e);
+ }
+ }
+
+ @Before
+ public void reset() throws Exception {
+ getClient().resetState();
+ }
+
+ @AfterClass
+ public static void stop() throws Exception {
+ txService.stopAndWait();
+ storage.stopAndWait();
+ zkClientService.stopAndWait();
+ zkServer.stopAndWait();
+ }
+
+ public TransactionSystemClient getClient() throws Exception {
+ return injector.getInstance(TransactionSystemClient.class);
+ }
+
+ @Test
+ public void testThriftServerStop() throws Exception {
+ int nThreads = NUM_CLIENTS;
+ ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
+ for (int i = 0; i < nThreads; ++i) {
+ executorService.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ TransactionSystemClient txClient = getClient();
+ CLIENTS_DONE_LATCH.countDown();
+ txClient.startShort();
+ } catch (Exception e) {
+ // Exception expected
+ }
+ }
+ });
+ }
+
+ // Wait till all clients finish sending reqeust to transaction manager
+ CLIENTS_DONE_LATCH.await();
+ TimeUnit.SECONDS.sleep(1);
+
+ // Expire zookeeper session, which causes Thrift server to stop.
+ expireZkSession(zkClientService);
+ waitForThriftTermination();
+
+ // Stop Zookeeper client so that it does not re-connect to Zookeeper and start Thrift sever again.
+ zkClientService.stopAndWait();
+ STORAGE_WAIT_LATCH.countDown();
+ TimeUnit.SECONDS.sleep(1);
+
+ // Make sure Thrift server stopped.
+ Assert.assertEquals(Service.State.TERMINATED, txService.thriftRPCServerState());
+ }
+
+ private void expireZkSession(ZKClientService zkClientService) throws Exception {
+ ZooKeeper zooKeeper = zkClientService.getZooKeeperSupplier().get();
+ final SettableFuture<?> connectFuture = SettableFuture.create();
+ Watcher watcher = new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ if (event.getState() == Event.KeeperState.SyncConnected) {
+ connectFuture.set(null);
+ }
+ }
+ };
+
+ // Create another Zookeeper session with the same sessionId so that the original one expires.
+ final ZooKeeper dupZookeeper =
+ new ZooKeeper(zkClientService.getConnectString(), zooKeeper.getSessionTimeout(), watcher,
+ zooKeeper.getSessionId(), zooKeeper.getSessionPasswd());
+ connectFuture.get(30, TimeUnit.SECONDS);
+ Assert.assertEquals("Failed to re-create current session", dupZookeeper.getState(), ZooKeeper.States.CONNECTED);
+ dupZookeeper.close();
+ }
+
+ private void waitForThriftTermination() throws InterruptedException {
+ int count = 0;
+ while (txService.thriftRPCServerState() != Service.State.TERMINATED && count++ < 200) {
+ TimeUnit.MILLISECONDS.sleep(50);
+ }
+ }
+
+ private static class SlowTransactionStorage extends InMemoryTransactionStateStorage {
+ @Override
+ public TransactionLog createLog(long timestamp) throws IOException {
+ return new SlowTransactionLog(timestamp);
+ }
+ }
+
+ private static class SlowTransactionLog extends InMemoryTransactionStateStorage.InMemoryTransactionLog {
+ public SlowTransactionLog(long timestamp) {
+ super(timestamp);
+ }
+
+ @Override
+ public void append(TransactionEdit edit) throws IOException {
+ try {
+ STORAGE_WAIT_LATCH.await();
+ } catch (InterruptedException e) {
+ LOG.error("Got exception: ", e);
+ }
+ super.append(edit);
+ }
+
+ @Override
+ public void append(List<TransactionEdit> edits) throws IOException {
+ try {
+ STORAGE_WAIT_LATCH.await();
+ } catch (InterruptedException e) {
+ LOG.error("Got exception: ", e);
+ }
+ super.append(edits);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/hbase/AbstractTransactionVisibilityFilterTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/hbase/AbstractTransactionVisibilityFilterTest.java b/tephra-core/src/test/java/org/apache/tephra/hbase/AbstractTransactionVisibilityFilterTest.java
new file mode 100644
index 0000000..4828adf
--- /dev/null
+++ b/tephra-core/src/test/java/org/apache/tephra/hbase/AbstractTransactionVisibilityFilterTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.hbase;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tephra.TransactionManager;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.util.ConfigurationFactory;
+import org.junit.After;
+import org.junit.Before;
+
+import java.util.List;
+
+/**
+ * Common test class for TransactionVisibilityFilter implementations.
+ */
+public abstract class AbstractTransactionVisibilityFilterTest {
+
+ protected static final byte[] FAM = new byte[] {'f'};
+ protected static final byte[] FAM2 = new byte[] {'f', '2'};
+ protected static final byte[] FAM3 = new byte[] {'f', '3'};
+ protected static final byte[] COL = new byte[] {'c'};
+ protected static final List<byte[]> EMPTY_CHANGESET = Lists.newArrayListWithCapacity(0);
+
+ protected TransactionManager txManager;
+
+ @Before
+ public void setup() throws Exception {
+ Configuration conf = new ConfigurationFactory().get();
+ conf.unset(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES);
+ txManager = new TransactionManager(conf);
+ txManager.startAndWait();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ txManager.stopAndWait();
+ }
+}