You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by po...@apache.org on 2016/05/11 20:15:30 UTC
[30/56] [abbrv] [partial] incubator-tephra git commit: Rename package
to org.apache.tephra
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/TransactionContextTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/co/cask/tephra/TransactionContextTest.java b/tephra-core/src/test/java/co/cask/tephra/TransactionContextTest.java
deleted file mode 100644
index 419bffb..0000000
--- a/tephra-core/src/test/java/co/cask/tephra/TransactionContextTest.java
+++ /dev/null
@@ -1,676 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra;
-
-import co.cask.tephra.inmemory.InMemoryTxSystemClient;
-import co.cask.tephra.runtime.ConfigModule;
-import co.cask.tephra.runtime.DiscoveryModules;
-import co.cask.tephra.runtime.TransactionModules;
-import co.cask.tephra.snapshot.SnapshotCodecV4;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.inject.AbstractModule;
-import com.google.inject.Guice;
-import com.google.inject.Inject;
-import com.google.inject.Injector;
-import com.google.inject.Singleton;
-import com.google.inject.util.Modules;
-import org.apache.hadoop.conf.Configuration;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-
-/**
- * Tests the transaction executor.
- */
-public class TransactionContextTest {
- private static DummyTxClient txClient;
-
- @ClassRule
- public static TemporaryFolder tmpFolder = new TemporaryFolder();
-
- @BeforeClass
- public static void setup() throws IOException {
- final Configuration conf = new Configuration();
- conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, SnapshotCodecV4.class.getName());
- conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath());
- Injector injector = Guice.createInjector(
- new ConfigModule(conf),
- new DiscoveryModules().getInMemoryModules(),
- Modules.override(
- new TransactionModules().getInMemoryModules()).with(new AbstractModule() {
- @Override
- protected void configure() {
- TransactionManager txManager = new TransactionManager(conf);
- txManager.startAndWait();
- bind(TransactionManager.class).toInstance(txManager);
- bind(TransactionSystemClient.class).to(DummyTxClient.class).in(Singleton.class);
- }
- }));
-
- txClient = (DummyTxClient) injector.getInstance(TransactionSystemClient.class);
- }
-
- final DummyTxAware ds1 = new DummyTxAware(), ds2 = new DummyTxAware();
-
- static final byte[] A = { 'a' };
- static final byte[] B = { 'b' };
-
- private static TransactionContext newTransactionContext(TransactionAware... txAwares) {
- return new TransactionContext(txClient, txAwares);
- }
-
- @Before
- public void resetTxAwares() {
- ds1.reset();
- ds2.reset();
- }
-
- @Test
- public void testSuccessful() throws TransactionFailureException, InterruptedException {
- TransactionContext context = newTransactionContext(ds1, ds2);
- // start transaction
- context.start();
- // add a change to ds1 and ds2
- ds1.addChange(A);
- ds2.addChange(B);
- // commit transaction
- context.finish();
- // verify both are committed and post-committed
- Assert.assertTrue(ds1.started);
- Assert.assertTrue(ds2.started);
- Assert.assertTrue(ds1.checked);
- Assert.assertTrue(ds2.checked);
- Assert.assertTrue(ds1.committed);
- Assert.assertTrue(ds2.committed);
- Assert.assertTrue(ds1.postCommitted);
- Assert.assertTrue(ds2.postCommitted);
- Assert.assertFalse(ds1.rolledBack);
- Assert.assertFalse(ds2.rolledBack);
- Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Committed);
- }
-
- @Test
- public void testPostCommitFailure() throws TransactionFailureException, InterruptedException {
- ds1.failPostCommitTxOnce = InduceFailure.ThrowException;
- TransactionContext context = newTransactionContext(ds1, ds2);
- // start transaction
- context.start();
- // add a change to ds1 and ds2
- ds1.addChange(A);
- ds2.addChange(B);
- // commit transaction should fail but without rollback as the failure happens post-commit
- try {
- context.finish();
- Assert.fail("post commit failed - exception should be thrown");
- } catch (TransactionFailureException e) {
- Assert.assertEquals("post failure", e.getCause().getMessage());
- }
- // verify both are committed and post-committed
- Assert.assertTrue(ds1.started);
- Assert.assertTrue(ds2.started);
- Assert.assertTrue(ds1.checked);
- Assert.assertTrue(ds2.checked);
- Assert.assertTrue(ds1.committed);
- Assert.assertTrue(ds2.committed);
- Assert.assertTrue(ds1.postCommitted);
- Assert.assertTrue(ds2.postCommitted);
- Assert.assertFalse(ds1.rolledBack);
- Assert.assertFalse(ds2.rolledBack);
- Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Committed);
- }
-
- @Test
- public void testPersistFailure() throws TransactionFailureException, InterruptedException {
- ds1.failCommitTxOnce = InduceFailure.ThrowException;
- TransactionContext context = newTransactionContext(ds1, ds2);
- // start transaction
- context.start();
- // add a change to ds1 and ds2
- ds1.addChange(A);
- ds2.addChange(B);
- // commit transaction should fail and cause rollback
- try {
- context.finish();
- Assert.fail("Persist should have failed - exception should be thrown");
- } catch (TransactionFailureException e) {
- Assert.assertEquals("persist failure", e.getCause().getMessage());
- }
- // verify both are rolled back and tx is aborted
- Assert.assertTrue(ds1.started);
- Assert.assertTrue(ds2.started);
- Assert.assertTrue(ds1.checked);
- Assert.assertTrue(ds2.checked);
- Assert.assertTrue(ds1.committed);
- Assert.assertFalse(ds2.committed);
- Assert.assertFalse(ds1.postCommitted);
- Assert.assertFalse(ds2.postCommitted);
- Assert.assertTrue(ds1.rolledBack);
- Assert.assertTrue(ds2.rolledBack);
- Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted);
- }
-
- @Test
- public void testPersistFalse() throws TransactionFailureException, InterruptedException {
- ds1.failCommitTxOnce = InduceFailure.ReturnFalse;
- TransactionContext context = newTransactionContext(ds1, ds2);
- // start transaction
- context.start();
- // add a change to ds1 and ds2
- ds1.addChange(A);
- ds2.addChange(B);
- // commit transaction should fail and cause rollback
- try {
- context.finish();
- Assert.fail("Persist should have failed - exception should be thrown");
- } catch (TransactionFailureException e) {
- Assert.assertNull(e.getCause()); // in this case, the ds simply returned false
- }
- // verify both are rolled back and tx is aborted
- Assert.assertTrue(ds1.started);
- Assert.assertTrue(ds2.started);
- Assert.assertTrue(ds1.checked);
- Assert.assertTrue(ds2.checked);
- Assert.assertTrue(ds1.committed);
- Assert.assertFalse(ds2.committed);
- Assert.assertFalse(ds1.postCommitted);
- Assert.assertFalse(ds2.postCommitted);
- Assert.assertTrue(ds1.rolledBack);
- Assert.assertTrue(ds2.rolledBack);
- Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted);
- }
-
- @Test
- public void testPersistAndRollbackFailure() throws TransactionFailureException, InterruptedException {
- ds1.failCommitTxOnce = InduceFailure.ThrowException;
- ds1.failRollbackTxOnce = InduceFailure.ThrowException;
- TransactionContext context = newTransactionContext(ds1, ds2);
- // start transaction
- context.start();
- // add a change to ds1 and ds2
- ds1.addChange(A);
- ds2.addChange(B);
- // commit transaction should fail and cause rollback
- try {
- context.finish();
- Assert.fail("Persist should have failed - exception should be thrown");
- } catch (TransactionFailureException e) {
- Assert.assertEquals("persist failure", e.getCause().getMessage());
- }
- // verify both are rolled back and tx is invalidated
- Assert.assertTrue(ds1.started);
- Assert.assertTrue(ds2.started);
- Assert.assertTrue(ds1.checked);
- Assert.assertTrue(ds2.checked);
- Assert.assertTrue(ds1.committed);
- Assert.assertFalse(ds2.committed);
- Assert.assertFalse(ds1.postCommitted);
- Assert.assertFalse(ds2.postCommitted);
- Assert.assertTrue(ds1.rolledBack);
- Assert.assertTrue(ds2.rolledBack);
- Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Invalidated);
- }
-
- @Test
- public void testPersistAndRollbackFalse() throws TransactionFailureException, InterruptedException {
- ds1.failCommitTxOnce = InduceFailure.ReturnFalse;
- ds1.failRollbackTxOnce = InduceFailure.ReturnFalse;
- TransactionContext context = newTransactionContext(ds1, ds2);
- // start transaction
- context.start();
- // add a change to ds1 and ds2
- ds1.addChange(A);
- ds2.addChange(B);
- // commit transaction should fail and cause rollback
- try {
- context.finish();
- Assert.fail("Persist should have failed - exception should be thrown");
- } catch (TransactionFailureException e) {
- Assert.assertNull(e.getCause()); // in this case, the ds simply returned false
- }
- // verify both are rolled back and tx is invalidated
- Assert.assertTrue(ds1.started);
- Assert.assertTrue(ds2.started);
- Assert.assertTrue(ds1.checked);
- Assert.assertTrue(ds2.checked);
- Assert.assertTrue(ds1.committed);
- Assert.assertFalse(ds2.committed);
- Assert.assertFalse(ds1.postCommitted);
- Assert.assertFalse(ds2.postCommitted);
- Assert.assertTrue(ds1.rolledBack);
- Assert.assertTrue(ds2.rolledBack);
- Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Invalidated);
- }
-
- @Test
- public void testCommitFalse() throws TransactionFailureException, InterruptedException {
- txClient.failCommits = 1;
- TransactionContext context = newTransactionContext(ds1, ds2);
- // start transaction
- context.start();
- // add a change to ds1 and ds2
- ds1.addChange(A);
- ds2.addChange(B);
- // commit transaction should fail and cause rollback
- try {
- context.finish();
- Assert.fail("commit failed - exception should be thrown");
- } catch (TransactionConflictException e) {
- Assert.assertNull(e.getCause());
- }
- // verify both are rolled back and tx is aborted
- Assert.assertTrue(ds1.started);
- Assert.assertTrue(ds2.started);
- Assert.assertTrue(ds1.checked);
- Assert.assertTrue(ds2.checked);
- Assert.assertTrue(ds1.committed);
- Assert.assertTrue(ds2.committed);
- Assert.assertFalse(ds1.postCommitted);
- Assert.assertFalse(ds2.postCommitted);
- Assert.assertTrue(ds1.rolledBack);
- Assert.assertTrue(ds2.rolledBack);
- Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted);
- }
-
- @Test
- public void testCanCommitFalse() throws TransactionFailureException, InterruptedException {
- txClient.failCanCommitOnce = true;
- TransactionContext context = newTransactionContext(ds1, ds2);
- // start transaction
- context.start();
- // add a change to ds1 and ds2
- ds1.addChange(A);
- ds2.addChange(B);
- // commit transaction should fail and cause rollback
- try {
- context.finish();
- Assert.fail("commit failed - exception should be thrown");
- } catch (TransactionConflictException e) {
- Assert.assertNull(e.getCause());
- }
- // verify both are rolled back and tx is aborted
- Assert.assertTrue(ds1.started);
- Assert.assertTrue(ds2.started);
- Assert.assertTrue(ds1.checked);
- Assert.assertTrue(ds2.checked);
- Assert.assertFalse(ds1.committed);
- Assert.assertFalse(ds2.committed);
- Assert.assertFalse(ds1.postCommitted);
- Assert.assertFalse(ds2.postCommitted);
- Assert.assertTrue(ds1.rolledBack);
- Assert.assertTrue(ds2.rolledBack);
- Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted);
- }
-
- @Test
- public void testChangesAndRollbackFailure() throws TransactionFailureException, InterruptedException {
- ds1.failChangesTxOnce = InduceFailure.ThrowException;
- ds1.failRollbackTxOnce = InduceFailure.ThrowException;
- TransactionContext context = newTransactionContext(ds1, ds2);
- // start transaction
- context.start();
- // add a change to ds1 and ds2
- ds1.addChange(A);
- ds2.addChange(B);
- // commit transaction should fail and cause rollback
- try {
- context.finish();
- Assert.fail("get changes failed - exception should be thrown");
- } catch (TransactionFailureException e) {
- Assert.assertEquals("changes failure", e.getCause().getMessage());
- }
- // verify both are rolled back and tx is invalidated
- Assert.assertTrue(ds1.started);
- Assert.assertTrue(ds2.started);
- Assert.assertTrue(ds1.checked);
- Assert.assertFalse(ds2.checked);
- Assert.assertFalse(ds1.committed);
- Assert.assertFalse(ds2.committed);
- Assert.assertFalse(ds1.postCommitted);
- Assert.assertFalse(ds2.postCommitted);
- Assert.assertTrue(ds1.rolledBack);
- Assert.assertTrue(ds2.rolledBack);
- Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Invalidated);
- }
-
- @Test
- public void testStartAndRollbackFailure() throws TransactionFailureException, InterruptedException {
- ds1.failStartTxOnce = InduceFailure.ThrowException;
- TransactionContext context = newTransactionContext(ds1, ds2);
- // start transaction
- try {
- context.start();
- Assert.fail("start failed - exception should be thrown");
- } catch (TransactionFailureException e) {
- Assert.assertEquals("start failure", e.getCause().getMessage());
- }
- // verify both are not rolled back and tx is aborted
- Assert.assertTrue(ds1.started);
- Assert.assertFalse(ds2.started);
- Assert.assertFalse(ds1.checked);
- Assert.assertFalse(ds2.checked);
- Assert.assertFalse(ds1.committed);
- Assert.assertFalse(ds2.committed);
- Assert.assertFalse(ds1.postCommitted);
- Assert.assertFalse(ds2.postCommitted);
- Assert.assertFalse(ds1.rolledBack);
- Assert.assertFalse(ds2.rolledBack);
- Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted);
- }
-
- @Test
- public void testAddThenSuccess() throws TransactionFailureException, InterruptedException {
- TransactionContext context = newTransactionContext(ds1);
- // start transaction
- context.start();
- // add a change to ds1
- ds1.addChange(A);
- // add ds2 to the tx
- context.addTransactionAware(ds2);
- // add a change to ds2
- ds2.addChange(B);
- // commit transaction
- context.finish();
- // verify both are committed and post-committed
- Assert.assertTrue(ds1.started);
- Assert.assertTrue(ds2.started);
- Assert.assertTrue(ds1.checked);
- Assert.assertTrue(ds2.checked);
- Assert.assertTrue(ds1.committed);
- Assert.assertTrue(ds2.committed);
- Assert.assertTrue(ds1.postCommitted);
- Assert.assertTrue(ds2.postCommitted);
- Assert.assertFalse(ds1.rolledBack);
- Assert.assertFalse(ds2.rolledBack);
- Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Committed);
- }
-
- @Test
- public void testAddThenFailure() throws TransactionFailureException, InterruptedException {
- ds2.failCommitTxOnce = InduceFailure.ThrowException;
-
- TransactionContext context = newTransactionContext(ds1);
- // start transaction
- context.start();
- // add a change to ds1
- ds1.addChange(A);
- // add ds2 to the tx
- context.addTransactionAware(ds2);
- // add a change to ds2
- ds2.addChange(B);
- // commit transaction should fail and cause rollback
- try {
- context.finish();
- Assert.fail("Persist should have failed - exception should be thrown");
- } catch (TransactionFailureException e) {
- Assert.assertEquals("persist failure", e.getCause().getMessage());
- }
- // verify both are rolled back and tx is aborted
- Assert.assertTrue(ds1.started);
- Assert.assertTrue(ds2.started);
- Assert.assertTrue(ds1.checked);
- Assert.assertTrue(ds2.checked);
- Assert.assertTrue(ds1.committed);
- Assert.assertTrue(ds2.committed);
- Assert.assertFalse(ds1.postCommitted);
- Assert.assertFalse(ds2.postCommitted);
- Assert.assertTrue(ds1.rolledBack);
- Assert.assertTrue(ds2.rolledBack);
- Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted);
- }
-
- @Test
- public void testAddThenRemoveSuccess() throws TransactionFailureException {
- TransactionContext context = newTransactionContext();
-
- context.start();
- Assert.assertTrue(context.addTransactionAware(ds1));
- ds1.addChange(A);
-
- try {
- context.removeTransactionAware(ds1);
- Assert.fail("Removal of TransactionAware should fails when there is active transaction.");
- } catch (IllegalStateException e) {
- // Expected
- }
-
- context.finish();
-
- Assert.assertTrue(context.removeTransactionAware(ds1));
- // Removing a TransactionAware not added before should returns false
- Assert.assertFalse(context.removeTransactionAware(ds2));
-
- // Verify ds1 is committed and post-committed
- Assert.assertTrue(ds1.started);
- Assert.assertTrue(ds1.checked);
- Assert.assertTrue(ds1.committed);
- Assert.assertTrue(ds1.postCommitted);
- Assert.assertFalse(ds1.rolledBack);
-
- // Verify nothing happen to ds2
- Assert.assertFalse(ds2.started);
- Assert.assertFalse(ds2.checked);
- Assert.assertFalse(ds2.committed);
- Assert.assertFalse(ds2.postCommitted);
- Assert.assertFalse(ds2.rolledBack);
-
- Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Committed);
- }
-
- @Test
- public void testAndThenRemoveOnFailure() throws TransactionFailureException {
- ds1.failCommitTxOnce = InduceFailure.ThrowException;
- TransactionContext context = newTransactionContext();
-
- context.start();
- Assert.assertTrue(context.addTransactionAware(ds1));
- ds1.addChange(A);
-
- try {
- context.finish();
- Assert.fail("Persist should have failed - exception should be thrown");
- } catch (TransactionFailureException e) {
- Assert.assertEquals("persist failure", e.getCause().getMessage());
- }
-
- Assert.assertTrue(context.removeTransactionAware(ds1));
-
- // Verify ds1 is rolled back
- Assert.assertTrue(ds1.started);
- Assert.assertTrue(ds1.checked);
- Assert.assertTrue(ds1.committed);
- Assert.assertFalse(ds1.postCommitted);
- Assert.assertTrue(ds1.rolledBack);
-
- Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted);
- }
-
- enum InduceFailure { NoFailure, ReturnFalse, ThrowException }
-
- static class DummyTxAware implements TransactionAware {
-
- Transaction tx;
- boolean started = false;
- boolean committed = false;
- boolean checked = false;
- boolean rolledBack = false;
- boolean postCommitted = false;
- List<byte[]> changes = Lists.newArrayList();
-
- InduceFailure failStartTxOnce = InduceFailure.NoFailure;
- InduceFailure failChangesTxOnce = InduceFailure.NoFailure;
- InduceFailure failCommitTxOnce = InduceFailure.NoFailure;
- InduceFailure failPostCommitTxOnce = InduceFailure.NoFailure;
- InduceFailure failRollbackTxOnce = InduceFailure.NoFailure;
-
- void addChange(byte[] key) {
- changes.add(key);
- }
-
- void reset() {
- tx = null;
- started = false;
- checked = false;
- committed = false;
- rolledBack = false;
- postCommitted = false;
- changes.clear();
- }
-
- @Override
- public void startTx(Transaction tx) {
- reset();
- started = true;
- this.tx = tx;
- if (failStartTxOnce == InduceFailure.ThrowException) {
- failStartTxOnce = InduceFailure.NoFailure;
- throw new RuntimeException("start failure");
- }
- }
-
- @Override
- public void updateTx(Transaction tx) {
- this.tx = tx;
- }
-
- @Override
- public Collection<byte[]> getTxChanges() {
- checked = true;
- if (failChangesTxOnce == InduceFailure.ThrowException) {
- failChangesTxOnce = InduceFailure.NoFailure;
- throw new RuntimeException("changes failure");
- }
- return ImmutableList.copyOf(changes);
- }
-
- @Override
- public boolean commitTx() throws Exception {
- committed = true;
- if (failCommitTxOnce == InduceFailure.ThrowException) {
- failCommitTxOnce = InduceFailure.NoFailure;
- throw new RuntimeException("persist failure");
- }
- if (failCommitTxOnce == InduceFailure.ReturnFalse) {
- failCommitTxOnce = InduceFailure.NoFailure;
- return false;
- }
- return true;
- }
-
- @Override
- public void postTxCommit() {
- postCommitted = true;
- if (failPostCommitTxOnce == InduceFailure.ThrowException) {
- failPostCommitTxOnce = InduceFailure.NoFailure;
- throw new RuntimeException("post failure");
- }
- }
-
- @Override
- public boolean rollbackTx() throws Exception {
- rolledBack = true;
- if (failRollbackTxOnce == InduceFailure.ThrowException) {
- failRollbackTxOnce = InduceFailure.NoFailure;
- throw new RuntimeException("rollback failure");
- }
- if (failRollbackTxOnce == InduceFailure.ReturnFalse) {
- failRollbackTxOnce = InduceFailure.NoFailure;
- return false;
- }
- return true;
- }
-
- @Override
- public String getTransactionAwareName() {
- return "dummy";
- }
- }
-
- static class DummyTxClient extends InMemoryTxSystemClient {
-
- boolean failCanCommitOnce = false;
- int failCommits = 0;
- enum CommitState {
- Started, Committed, Aborted, Invalidated
- }
- CommitState state = CommitState.Started;
-
- @Inject
- DummyTxClient(TransactionManager txmgr) {
- super(txmgr);
- }
-
- @Override
- public boolean canCommit(Transaction tx, Collection<byte[]> changeIds) throws TransactionNotInProgressException {
- if (failCanCommitOnce) {
- failCanCommitOnce = false;
- return false;
- } else {
- return super.canCommit(tx, changeIds);
- }
- }
-
- @Override
- public boolean commit(Transaction tx) throws TransactionNotInProgressException {
- if (failCommits-- > 0) {
- return false;
- } else {
- state = CommitState.Committed;
- return super.commit(tx);
- }
- }
-
- @Override
- public Transaction startLong() {
- state = CommitState.Started;
- return super.startLong();
- }
-
- @Override
- public Transaction startShort() {
- state = CommitState.Started;
- return super.startShort();
- }
-
- @Override
- public Transaction startShort(int timeout) {
- state = CommitState.Started;
- return super.startShort(timeout);
- }
-
- @Override
- public void abort(Transaction tx) {
- state = CommitState.Aborted;
- super.abort(tx);
- }
-
- @Override
- public boolean invalidate(long tx) {
- state = CommitState.Invalidated;
- return super.invalidate(tx);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/TransactionExecutorTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/co/cask/tephra/TransactionExecutorTest.java b/tephra-core/src/test/java/co/cask/tephra/TransactionExecutorTest.java
deleted file mode 100644
index 7e67f2c..0000000
--- a/tephra-core/src/test/java/co/cask/tephra/TransactionExecutorTest.java
+++ /dev/null
@@ -1,590 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra;
-
-import co.cask.tephra.inmemory.InMemoryTxSystemClient;
-import co.cask.tephra.runtime.ConfigModule;
-import co.cask.tephra.runtime.DiscoveryModules;
-import co.cask.tephra.runtime.TransactionModules;
-import co.cask.tephra.snapshot.DefaultSnapshotCodec;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.inject.AbstractModule;
-import com.google.inject.Guice;
-import com.google.inject.Inject;
-import com.google.inject.Injector;
-import com.google.inject.Singleton;
-import com.google.inject.util.Modules;
-import org.apache.hadoop.conf.Configuration;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-import javax.annotation.Nullable;
-
-/**
- * Tests the transaction executor.
- */
-public class TransactionExecutorTest {
- private static DummyTxClient txClient;
- private static TransactionExecutorFactory factory;
-
- @ClassRule
- public static TemporaryFolder tmpFolder = new TemporaryFolder();
-
- @BeforeClass
- public static void setup() throws IOException {
- final Configuration conf = new Configuration();
- conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, DefaultSnapshotCodec.class.getName());
- conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath());
- Injector injector = Guice.createInjector(
- new ConfigModule(conf),
- new DiscoveryModules().getInMemoryModules(),
- Modules.override(
- new TransactionModules().getInMemoryModules()).with(new AbstractModule() {
- @Override
- protected void configure() {
- TransactionManager txManager = new TransactionManager(conf);
- txManager.startAndWait();
- bind(TransactionManager.class).toInstance(txManager);
- bind(TransactionSystemClient.class).to(DummyTxClient.class).in(Singleton.class);
- }
- }));
-
- txClient = (DummyTxClient) injector.getInstance(TransactionSystemClient.class);
- factory = injector.getInstance(TransactionExecutorFactory.class);
- }
-
- final DummyTxAware ds1 = new DummyTxAware(), ds2 = new DummyTxAware();
- final Collection<TransactionAware> txAwares = ImmutableList.<TransactionAware>of(ds1, ds2);
-
- private TransactionExecutor getExecutor() {
- return factory.createExecutor(txAwares);
- }
-
- private TransactionExecutor getExecutorWithNoRetry() {
- return new DefaultTransactionExecutor(txClient, txAwares, RetryStrategies.noRetries());
- }
-
- static final byte[] A = { 'a' };
- static final byte[] B = { 'b' };
-
- final TransactionExecutor.Function<Integer, Integer> testFunction =
- new TransactionExecutor.Function<Integer, Integer>() {
- @Override
- public Integer apply(@Nullable Integer input) {
- ds1.addChange(A);
- ds2.addChange(B);
- if (input == null) {
- throw new RuntimeException("function failed");
- }
- return input * input;
- }
- };
-
- @Before
- public void resetTxAwares() {
- ds1.reset();
- ds2.reset();
- }
-
- @Test
- public void testSuccessful() throws TransactionFailureException, InterruptedException {
- // execute: add a change to ds1 and ds2
- Integer result = getExecutor().execute(testFunction, 10);
- // verify both are committed and post-committed
- Assert.assertTrue(ds1.started);
- Assert.assertTrue(ds2.started);
- Assert.assertTrue(ds1.checked);
- Assert.assertTrue(ds2.checked);
- Assert.assertTrue(ds1.committed);
- Assert.assertTrue(ds2.committed);
- Assert.assertTrue(ds1.postCommitted);
- Assert.assertTrue(ds2.postCommitted);
- Assert.assertFalse(ds1.rolledBack);
- Assert.assertFalse(ds2.rolledBack);
- Assert.assertTrue(100 == result);
- Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Committed);
- }
-
- @Test
- public void testPostCommitFailure() throws TransactionFailureException, InterruptedException {
- ds1.failPostCommitTxOnce = InduceFailure.ThrowException;
- // execute: add a change to ds1 and ds2
- try {
- getExecutor().execute(testFunction, 10);
- Assert.fail("post commit failed - exception should be thrown");
- } catch (TransactionFailureException e) {
- Assert.assertEquals("post failure", e.getCause().getMessage());
- }
- // verify both are committed and post-committed
- Assert.assertTrue(ds1.started);
- Assert.assertTrue(ds2.started);
- Assert.assertTrue(ds1.checked);
- Assert.assertTrue(ds2.checked);
- Assert.assertTrue(ds1.committed);
- Assert.assertTrue(ds2.committed);
- Assert.assertTrue(ds1.postCommitted);
- Assert.assertTrue(ds2.postCommitted);
- Assert.assertFalse(ds1.rolledBack);
- Assert.assertFalse(ds2.rolledBack);
- Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Committed);
- }
-
- @Test
- public void testPersistFailure() throws TransactionFailureException, InterruptedException {
- ds1.failCommitTxOnce = InduceFailure.ThrowException;
- // execute: add a change to ds1 and ds2
- try {
- getExecutor().execute(testFunction, 10);
- Assert.fail("persist failed - exception should be thrown");
- } catch (TransactionFailureException e) {
- Assert.assertEquals("persist failure", e.getCause().getMessage());
- }
- // verify both are rolled back and tx is aborted
- Assert.assertTrue(ds1.started);
- Assert.assertTrue(ds2.started);
- Assert.assertTrue(ds1.checked);
- Assert.assertTrue(ds2.checked);
- Assert.assertTrue(ds1.committed);
- Assert.assertFalse(ds2.committed);
- Assert.assertFalse(ds1.postCommitted);
- Assert.assertFalse(ds2.postCommitted);
- Assert.assertTrue(ds1.rolledBack);
- Assert.assertTrue(ds2.rolledBack);
- Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted);
- }
-
- @Test
- public void testPersistFalse() throws TransactionFailureException, InterruptedException {
- ds1.failCommitTxOnce = InduceFailure.ReturnFalse;
- // execute: add a change to ds1 and ds2
- try {
- getExecutor().execute(testFunction, 10);
- Assert.fail("persist failed - exception should be thrown");
- } catch (TransactionFailureException e) {
- Assert.assertNull(e.getCause()); // in this case, the ds simply returned false
- }
- // verify both are rolled back and tx is aborted
- Assert.assertTrue(ds1.started);
- Assert.assertTrue(ds2.started);
- Assert.assertTrue(ds1.checked);
- Assert.assertTrue(ds2.checked);
- Assert.assertTrue(ds1.committed);
- Assert.assertFalse(ds2.committed);
- Assert.assertFalse(ds1.postCommitted);
- Assert.assertFalse(ds2.postCommitted);
- Assert.assertTrue(ds1.rolledBack);
- Assert.assertTrue(ds2.rolledBack);
- Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted);
- }
-
- @Test
- public void testPersistAndRollbackFailure() throws TransactionFailureException, InterruptedException {
- ds1.failCommitTxOnce = InduceFailure.ThrowException;
- ds1.failRollbackTxOnce = InduceFailure.ThrowException;
- // execute: add a change to ds1 and ds2
- try {
- getExecutor().execute(testFunction, 10);
- Assert.fail("persist failed - exception should be thrown");
- } catch (TransactionFailureException e) {
- Assert.assertEquals("persist failure", e.getCause().getMessage());
- }
- // verify both are rolled back and tx is invalidated
- Assert.assertTrue(ds1.started);
- Assert.assertTrue(ds2.started);
- Assert.assertTrue(ds1.checked);
- Assert.assertTrue(ds2.checked);
- Assert.assertTrue(ds1.committed);
- Assert.assertFalse(ds2.committed);
- Assert.assertFalse(ds1.postCommitted);
- Assert.assertFalse(ds2.postCommitted);
- Assert.assertTrue(ds1.rolledBack);
- Assert.assertTrue(ds2.rolledBack);
- Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Invalidated);
- }
-
- @Test
- public void testPersistAndRollbackFalse() throws TransactionFailureException, InterruptedException {
- ds1.failCommitTxOnce = InduceFailure.ReturnFalse;
- ds1.failRollbackTxOnce = InduceFailure.ReturnFalse;
- // execute: add a change to ds1 and ds2
- try {
- getExecutor().execute(testFunction, 10);
- Assert.fail("persist failed - exception should be thrown");
- } catch (TransactionFailureException e) {
- Assert.assertNull(e.getCause()); // in this case, the ds simply returned false
- }
- // verify both are rolled back and tx is invalidated
- Assert.assertTrue(ds1.started);
- Assert.assertTrue(ds2.started);
- Assert.assertTrue(ds1.checked);
- Assert.assertTrue(ds2.checked);
- Assert.assertTrue(ds1.committed);
- Assert.assertFalse(ds2.committed);
- Assert.assertFalse(ds1.postCommitted);
- Assert.assertFalse(ds2.postCommitted);
- Assert.assertTrue(ds1.rolledBack);
- Assert.assertTrue(ds2.rolledBack);
- Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Invalidated);
- }
-
- @Test
- public void testNoIndefiniteRetryByDefault() throws TransactionFailureException, InterruptedException {
- // we want retry by default, so that engineers don't miss it
- txClient.failCommits = 1000;
- try {
- // execute: add a change to ds1 and ds2
- getExecutor().execute(testFunction, 10);
- Assert.fail("commit failed too many times to retry - exception should be thrown");
- } catch (TransactionConflictException e) {
- Assert.assertNull(e.getCause());
- }
-
- txClient.failCommits = 0;
-
- // verify both are rolled back and tx is aborted
- Assert.assertTrue(ds1.started);
- Assert.assertTrue(ds2.started);
- Assert.assertTrue(ds1.checked);
- Assert.assertTrue(ds2.checked);
- Assert.assertTrue(ds1.committed);
- Assert.assertTrue(ds2.committed);
- Assert.assertFalse(ds1.postCommitted);
- Assert.assertFalse(ds2.postCommitted);
- Assert.assertTrue(ds1.rolledBack);
- Assert.assertTrue(ds2.rolledBack);
- Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted);
- }
-
- @Test
- public void testRetryByDefault() throws TransactionFailureException, InterruptedException {
- // we want retry by default, so that engineers don't miss it
- txClient.failCommits = 2;
- // execute: add a change to ds1 and ds2
- getExecutor().execute(testFunction, 10);
- // should not fail, but continue
-
- // verify both are committed
- Assert.assertTrue(ds1.started);
- Assert.assertTrue(ds2.started);
- Assert.assertTrue(ds1.checked);
- Assert.assertTrue(ds2.checked);
- Assert.assertTrue(ds1.committed);
- Assert.assertTrue(ds2.committed);
- Assert.assertTrue(ds1.postCommitted);
- Assert.assertTrue(ds2.postCommitted);
- Assert.assertFalse(ds1.rolledBack);
- Assert.assertFalse(ds2.rolledBack);
- Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Committed);
- }
-
- @Test
- public void testCommitFalse() throws TransactionFailureException, InterruptedException {
- txClient.failCommits = 1;
- // execute: add a change to ds1 and ds2
- try {
- getExecutorWithNoRetry().execute(testFunction, 10);
- Assert.fail("commit failed - exception should be thrown");
- } catch (TransactionConflictException e) {
- Assert.assertNull(e.getCause());
- }
- // verify both are rolled back and tx is aborted
- Assert.assertTrue(ds1.started);
- Assert.assertTrue(ds2.started);
- Assert.assertTrue(ds1.checked);
- Assert.assertTrue(ds2.checked);
- Assert.assertTrue(ds1.committed);
- Assert.assertTrue(ds2.committed);
- Assert.assertFalse(ds1.postCommitted);
- Assert.assertFalse(ds2.postCommitted);
- Assert.assertTrue(ds1.rolledBack);
- Assert.assertTrue(ds2.rolledBack);
- Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted);
- }
-
- @Test
- public void testCanCommitFalse() throws TransactionFailureException, InterruptedException {
- txClient.failCanCommitOnce = true;
- // execute: add a change to ds1 and ds2
- try {
- getExecutorWithNoRetry().execute(testFunction, 10);
- Assert.fail("commit failed - exception should be thrown");
- } catch (TransactionConflictException e) {
- Assert.assertNull(e.getCause());
- }
- // verify both are rolled back and tx is aborted
- Assert.assertTrue(ds1.started);
- Assert.assertTrue(ds2.started);
- Assert.assertTrue(ds1.checked);
- Assert.assertTrue(ds2.checked);
- Assert.assertFalse(ds1.committed);
- Assert.assertFalse(ds2.committed);
- Assert.assertFalse(ds1.postCommitted);
- Assert.assertFalse(ds2.postCommitted);
- Assert.assertTrue(ds1.rolledBack);
- Assert.assertTrue(ds2.rolledBack);
- Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted);
- }
-
- @Test
- public void testChangesAndRollbackFailure() throws TransactionFailureException, InterruptedException {
- ds1.failChangesTxOnce = InduceFailure.ThrowException;
- ds1.failRollbackTxOnce = InduceFailure.ThrowException;
- // execute: add a change to ds1 and ds2
- try {
- getExecutor().execute(testFunction, 10);
- Assert.fail("get changes failed - exception should be thrown");
- } catch (TransactionFailureException e) {
- Assert.assertEquals("changes failure", e.getCause().getMessage());
- }
- // verify both are rolled back and tx is invalidated
- Assert.assertTrue(ds1.started);
- Assert.assertTrue(ds2.started);
- Assert.assertTrue(ds1.checked);
- Assert.assertFalse(ds2.checked);
- Assert.assertFalse(ds1.committed);
- Assert.assertFalse(ds2.committed);
- Assert.assertFalse(ds1.postCommitted);
- Assert.assertFalse(ds2.postCommitted);
- Assert.assertTrue(ds1.rolledBack);
- Assert.assertTrue(ds2.rolledBack);
- Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Invalidated);
- }
-
- @Test
- public void testFunctionAndRollbackFailure() throws TransactionFailureException, InterruptedException {
- ds1.failRollbackTxOnce = InduceFailure.ReturnFalse;
- // execute: add a change to ds1 and ds2
- try {
- getExecutor().execute(testFunction, null);
- Assert.fail("function failed - exception should be thrown");
- } catch (TransactionFailureException e) {
- Assert.assertEquals("function failed", e.getCause().getMessage());
- }
- // verify both are rolled back and tx is invalidated
- Assert.assertTrue(ds1.started);
- Assert.assertTrue(ds2.started);
- Assert.assertFalse(ds1.checked);
- Assert.assertFalse(ds2.checked);
- Assert.assertFalse(ds1.committed);
- Assert.assertFalse(ds2.committed);
- Assert.assertFalse(ds1.postCommitted);
- Assert.assertFalse(ds2.postCommitted);
- Assert.assertTrue(ds1.rolledBack);
- Assert.assertTrue(ds2.rolledBack);
- Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Invalidated);
- }
-
- @Test
- public void testStartAndRollbackFailure() throws TransactionFailureException, InterruptedException {
- ds1.failStartTxOnce = InduceFailure.ThrowException;
- // execute: add a change to ds1 and ds2
- try {
- getExecutor().execute(testFunction, 10);
- Assert.fail("start failed - exception should be thrown");
- } catch (TransactionFailureException e) {
- Assert.assertEquals("start failure", e.getCause().getMessage());
- }
- // verify both are not rolled back and tx is aborted
- Assert.assertTrue(ds1.started);
- Assert.assertFalse(ds2.started);
- Assert.assertFalse(ds1.checked);
- Assert.assertFalse(ds2.checked);
- Assert.assertFalse(ds1.committed);
- Assert.assertFalse(ds2.committed);
- Assert.assertFalse(ds1.postCommitted);
- Assert.assertFalse(ds2.postCommitted);
- Assert.assertFalse(ds1.rolledBack);
- Assert.assertFalse(ds2.rolledBack);
- Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted);
- }
-
- enum InduceFailure { NoFailure, ReturnFalse, ThrowException }
-
- static class DummyTxAware implements TransactionAware {
-
- Transaction tx;
- boolean started = false;
- boolean committed = false;
- boolean checked = false;
- boolean rolledBack = false;
- boolean postCommitted = false;
- List<byte[]> changes = Lists.newArrayList();
-
- InduceFailure failStartTxOnce = InduceFailure.NoFailure;
- InduceFailure failChangesTxOnce = InduceFailure.NoFailure;
- InduceFailure failCommitTxOnce = InduceFailure.NoFailure;
- InduceFailure failPostCommitTxOnce = InduceFailure.NoFailure;
- InduceFailure failRollbackTxOnce = InduceFailure.NoFailure;
-
- void addChange(byte[] key) {
- changes.add(key);
- }
-
- void reset() {
- tx = null;
- started = false;
- checked = false;
- committed = false;
- rolledBack = false;
- postCommitted = false;
- changes.clear();
- }
-
- @Override
- public void startTx(Transaction tx) {
- reset();
- started = true;
- this.tx = tx;
- if (failStartTxOnce == InduceFailure.ThrowException) {
- failStartTxOnce = InduceFailure.NoFailure;
- throw new RuntimeException("start failure");
- }
- }
-
- @Override
- public void updateTx(Transaction tx) {
- this.tx = tx;
- }
-
- @Override
- public Collection<byte[]> getTxChanges() {
- checked = true;
- if (failChangesTxOnce == InduceFailure.ThrowException) {
- failChangesTxOnce = InduceFailure.NoFailure;
- throw new RuntimeException("changes failure");
- }
- return ImmutableList.copyOf(changes);
- }
-
- @Override
- public boolean commitTx() throws Exception {
- committed = true;
- if (failCommitTxOnce == InduceFailure.ThrowException) {
- failCommitTxOnce = InduceFailure.NoFailure;
- throw new RuntimeException("persist failure");
- }
- if (failCommitTxOnce == InduceFailure.ReturnFalse) {
- failCommitTxOnce = InduceFailure.NoFailure;
- return false;
- }
- return true;
- }
-
- @Override
- public void postTxCommit() {
- postCommitted = true;
- if (failPostCommitTxOnce == InduceFailure.ThrowException) {
- failPostCommitTxOnce = InduceFailure.NoFailure;
- throw new RuntimeException("post failure");
- }
- }
-
- @Override
- public boolean rollbackTx() throws Exception {
- rolledBack = true;
- if (failRollbackTxOnce == InduceFailure.ThrowException) {
- failRollbackTxOnce = InduceFailure.NoFailure;
- throw new RuntimeException("rollback failure");
- }
- if (failRollbackTxOnce == InduceFailure.ReturnFalse) {
- failRollbackTxOnce = InduceFailure.NoFailure;
- return false;
- }
- return true;
- }
-
- @Override
- public String getTransactionAwareName() {
- return "dummy";
- }
- }
-
- static class DummyTxClient extends InMemoryTxSystemClient {
-
- boolean failCanCommitOnce = false;
- int failCommits = 0;
- enum CommitState {
- Started, Committed, Aborted, Invalidated
- }
- CommitState state = CommitState.Started;
-
- @Inject
- DummyTxClient(TransactionManager txmgr) {
- super(txmgr);
- }
-
- @Override
- public boolean canCommit(Transaction tx, Collection<byte[]> changeIds) throws TransactionNotInProgressException {
- if (failCanCommitOnce) {
- failCanCommitOnce = false;
- return false;
- } else {
- return super.canCommit(tx, changeIds);
- }
- }
-
- @Override
- public boolean commit(Transaction tx) throws TransactionNotInProgressException {
- if (failCommits-- > 0) {
- return false;
- } else {
- state = CommitState.Committed;
- return super.commit(tx);
- }
- }
-
- @Override
- public Transaction startLong() {
- state = CommitState.Started;
- return super.startLong();
- }
-
- @Override
- public Transaction startShort() {
- state = CommitState.Started;
- return super.startShort();
- }
-
- @Override
- public Transaction startShort(int timeout) {
- state = CommitState.Started;
- return super.startShort(timeout);
- }
-
- @Override
- public void abort(Transaction tx) {
- state = CommitState.Aborted;
- super.abort(tx);
- }
-
- @Override
- public boolean invalidate(long tx) {
- state = CommitState.Invalidated;
- return super.invalidate(tx);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/TransactionManagerTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/co/cask/tephra/TransactionManagerTest.java b/tephra-core/src/test/java/co/cask/tephra/TransactionManagerTest.java
deleted file mode 100644
index ddd32db..0000000
--- a/tephra-core/src/test/java/co/cask/tephra/TransactionManagerTest.java
+++ /dev/null
@@ -1,346 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra;
-
-import co.cask.tephra.inmemory.InMemoryTxSystemClient;
-import co.cask.tephra.metrics.TxMetricsCollector;
-import co.cask.tephra.persist.InMemoryTransactionStateStorage;
-import co.cask.tephra.persist.TransactionStateStorage;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import org.apache.hadoop.conf.Configuration;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.Collections;
-import java.util.concurrent.TimeUnit;
-
-/**
- *
- */
-public class TransactionManagerTest extends TransactionSystemTest {
-
- static Configuration conf = new Configuration();
-
- TransactionManager txManager = null;
- TransactionStateStorage txStateStorage = null;
-
- @Override
- protected TransactionSystemClient getClient() {
- return new InMemoryTxSystemClient(txManager);
- }
-
- @Override
- protected TransactionStateStorage getStateStorage() {
- return txStateStorage;
- }
-
- @Before
- public void before() {
- conf.setInt(TxConstants.Manager.CFG_TX_CLEANUP_INTERVAL, 0); // no cleanup thread
- // todo should create two sets of tests, one with LocalFileTxStateStorage and one with InMemoryTxStateStorage
- txStateStorage = new InMemoryTransactionStateStorage();
- txManager = new TransactionManager
- (conf, txStateStorage, new TxMetricsCollector());
- txManager.startAndWait();
- }
-
- @After
- public void after() {
- txManager.stopAndWait();
- }
-
- @Test
- public void testTransactionCleanup() throws Exception {
- conf.setInt(TxConstants.Manager.CFG_TX_CLEANUP_INTERVAL, 3);
- conf.setInt(TxConstants.Manager.CFG_TX_TIMEOUT, 2);
- // using a new tx manager that cleans up
- TransactionManager txm = new TransactionManager
- (conf, new InMemoryTransactionStateStorage(), new TxMetricsCollector());
- txm.startAndWait();
- try {
- Assert.assertEquals(0, txm.getInvalidSize());
- Assert.assertEquals(0, txm.getCommittedSize());
- // start a transaction and leave it open
- Transaction tx1 = txm.startShort();
- // start a long running transaction and leave it open
- Transaction tx2 = txm.startLong();
- Transaction tx3 = txm.startLong();
- // start and commit a bunch of transactions
- for (int i = 0; i < 10; i++) {
- Transaction tx = txm.startShort();
- Assert.assertTrue(txm.canCommit(tx, Collections.singleton(new byte[] { (byte) i })));
- Assert.assertTrue(txm.commit(tx));
- }
- // all of these should still be in the committed set
- Assert.assertEquals(0, txm.getInvalidSize());
- Assert.assertEquals(10, txm.getCommittedSize());
- // sleep longer than the cleanup interval
- TimeUnit.SECONDS.sleep(5);
- // transaction should now be invalid
- Assert.assertEquals(1, txm.getInvalidSize());
- // run another transaction
- Transaction txx = txm.startShort();
- // verify the exclude
- Assert.assertFalse(txx.isVisible(tx1.getTransactionId()));
- Assert.assertFalse(txx.isVisible(tx2.getTransactionId()));
- Assert.assertFalse(txx.isVisible(tx3.getTransactionId()));
- // try to commit the last transaction that was started
- Assert.assertTrue(txm.canCommit(txx, Collections.singleton(new byte[] { 0x0a })));
- Assert.assertTrue(txm.commit(txx));
-
- // now the committed change sets should be empty again
- Assert.assertEquals(0, txm.getCommittedSize());
- // cannot commit transaction as it was timed out
- try {
- txm.canCommit(tx1, Collections.singleton(new byte[] { 0x11 }));
- Assert.fail();
- } catch (TransactionNotInProgressException e) {
- // expected
- }
- txm.abort(tx1);
- // abort should have removed from invalid
- Assert.assertEquals(0, txm.getInvalidSize());
- // run another bunch of transactions
- for (int i = 0; i < 10; i++) {
- Transaction tx = txm.startShort();
- Assert.assertTrue(txm.canCommit(tx, Collections.singleton(new byte[] { (byte) i })));
- Assert.assertTrue(txm.commit(tx));
- }
- // none of these should still be in the committed set (tx2 is long-running).
- Assert.assertEquals(0, txm.getInvalidSize());
- Assert.assertEquals(0, txm.getCommittedSize());
- // commit tx2, abort tx3
- Assert.assertTrue(txm.commit(tx2));
- txm.abort(tx3);
- // none of these should still be in the committed set (tx2 is long-running).
- // Only tx3 is invalid list as it was aborted and is long-running. tx1 is short one and it rolled back its changes
- // so it should NOT be in invalid list
- Assert.assertEquals(1, txm.getInvalidSize());
- Assert.assertEquals(tx3.getTransactionId(), (long) txm.getCurrentState().getInvalid().iterator().next());
- Assert.assertEquals(1, txm.getExcludedListSize());
- } finally {
- txm.stopAndWait();
- }
- }
-
- @Test
- public void testLongTransactionCleanup() throws Exception {
- conf.setInt(TxConstants.Manager.CFG_TX_CLEANUP_INTERVAL, 3);
- conf.setInt(TxConstants.Manager.CFG_TX_LONG_TIMEOUT, 2);
- // using a new tx manager that cleans up
- TransactionManager txm = new TransactionManager
- (conf, new InMemoryTransactionStateStorage(), new TxMetricsCollector());
- txm.startAndWait();
- try {
- Assert.assertEquals(0, txm.getInvalidSize());
- Assert.assertEquals(0, txm.getCommittedSize());
-
- // start a long running transaction
- Transaction tx1 = txm.startLong();
-
- Assert.assertEquals(0, txm.getInvalidSize());
- Assert.assertEquals(0, txm.getCommittedSize());
-
- // sleep longer than the cleanup interval
- TimeUnit.SECONDS.sleep(5);
-
- // transaction should now be invalid
- Assert.assertEquals(1, txm.getInvalidSize());
- Assert.assertEquals(0, txm.getCommittedSize());
-
- // cannot commit transaction as it was timed out
- try {
- txm.canCommit(tx1, Collections.singleton(new byte[] { 0x11 }));
- Assert.fail();
- } catch (TransactionNotInProgressException e) {
- // expected
- }
-
- txm.abort(tx1);
- // abort should not remove long running transaction from invalid list
- Assert.assertEquals(1, txm.getInvalidSize());
- } finally {
- txm.stopAndWait();
- }
- }
-
- @Test
- public void testTruncateInvalid() throws Exception {
- InMemoryTransactionStateStorage storage = new InMemoryTransactionStateStorage();
- Configuration testConf = new Configuration(conf);
- // No snapshots
- testConf.setLong(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, -1);
- TransactionManager txm1 = new TransactionManager(testConf, storage, new TxMetricsCollector());
- txm1.startAndWait();
-
- TransactionManager txm2 = null;
- Transaction tx1;
- Transaction tx2;
- Transaction tx3;
- Transaction tx4;
- Transaction tx5;
- Transaction tx6;
- try {
- Assert.assertEquals(0, txm1.getInvalidSize());
-
- // start a few transactions
- tx1 = txm1.startLong();
- tx2 = txm1.startShort();
- tx3 = txm1.startLong();
- tx4 = txm1.startShort();
- tx5 = txm1.startLong();
- tx6 = txm1.startShort();
-
- // invalidate tx1, tx2, tx5 and tx6
- txm1.invalidate(tx1.getTransactionId());
- txm1.invalidate(tx2.getTransactionId());
- txm1.invalidate(tx5.getTransactionId());
- txm1.invalidate(tx6.getTransactionId());
-
- // tx1, tx2, tx5 and tx6 should be in invalid list
- Assert.assertEquals(
- ImmutableList.of(tx1.getTransactionId(), tx2.getTransactionId(), tx5.getTransactionId(),
- tx6.getTransactionId()),
- txm1.getCurrentState().getInvalid()
- );
-
- // remove tx1 and tx6 from invalid list
- Assert.assertTrue(txm1.truncateInvalidTx(ImmutableSet.of(tx1.getTransactionId(), tx6.getTransactionId())));
-
- // only tx2 and tx5 should be in invalid list now
- Assert.assertEquals(ImmutableList.of(tx2.getTransactionId(), tx5.getTransactionId()),
- txm1.getCurrentState().getInvalid());
-
- // removing in-progress transactions should not have any effect
- Assert.assertEquals(ImmutableSet.of(tx3.getTransactionId(), tx4.getTransactionId()),
- txm1.getCurrentState().getInProgress().keySet());
- Assert.assertFalse(txm1.truncateInvalidTx(ImmutableSet.of(tx3.getTransactionId(), tx4.getTransactionId())));
- // no change to in-progress
- Assert.assertEquals(ImmutableSet.of(tx3.getTransactionId(), tx4.getTransactionId()),
- txm1.getCurrentState().getInProgress().keySet());
- // no change to invalid list
- Assert.assertEquals(ImmutableList.of(tx2.getTransactionId(), tx5.getTransactionId()),
- txm1.getCurrentState().getInvalid());
-
- // Test transaction edit logs replay
- // Start another transaction manager without stopping txm1 so that snapshot does not get written,
- // and all logs can be replayed.
- txm2 = new TransactionManager(testConf, storage, new TxMetricsCollector());
- txm2.startAndWait();
- Assert.assertEquals(ImmutableList.of(tx2.getTransactionId(), tx5.getTransactionId()),
- txm2.getCurrentState().getInvalid());
- Assert.assertEquals(ImmutableSet.of(tx3.getTransactionId(), tx4.getTransactionId()),
- txm2.getCurrentState().getInProgress().keySet());
- } finally {
- txm1.stopAndWait();
- if (txm2 != null) {
- txm2.stopAndWait();
- }
- }
- }
-
- @Test
- public void testTruncateInvalidBeforeTime() throws Exception {
- InMemoryTransactionStateStorage storage = new InMemoryTransactionStateStorage();
- Configuration testConf = new Configuration(conf);
- // No snapshots
- testConf.setLong(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, -1);
- TransactionManager txm1 = new TransactionManager(testConf, storage, new TxMetricsCollector());
- txm1.startAndWait();
-
- TransactionManager txm2 = null;
- Transaction tx1;
- Transaction tx2;
- Transaction tx3;
- Transaction tx4;
- Transaction tx5;
- Transaction tx6;
- try {
- Assert.assertEquals(0, txm1.getInvalidSize());
-
- // start a few transactions
- tx1 = txm1.startLong();
- tx2 = txm1.startShort();
- // Sleep so that transaction ids get generated a millisecond apart for assertion
- // TEPHRA-63 should eliminate the need to sleep
- TimeUnit.MILLISECONDS.sleep(1);
- long timeBeforeTx3 = System.currentTimeMillis();
- tx3 = txm1.startLong();
- tx4 = txm1.startShort();
- TimeUnit.MILLISECONDS.sleep(1);
- long timeBeforeTx5 = System.currentTimeMillis();
- tx5 = txm1.startLong();
- tx6 = txm1.startShort();
-
- // invalidate tx1, tx2, tx5 and tx6
- txm1.invalidate(tx1.getTransactionId());
- txm1.invalidate(tx2.getTransactionId());
- txm1.invalidate(tx5.getTransactionId());
- txm1.invalidate(tx6.getTransactionId());
-
- // tx1, tx2, tx5 and tx6 should be in invalid list
- Assert.assertEquals(
- ImmutableList.of(tx1.getTransactionId(), tx2.getTransactionId(), tx5.getTransactionId(),
- tx6.getTransactionId()),
- txm1.getCurrentState().getInvalid()
- );
-
- // remove transactions before tx3 from invalid list
- Assert.assertTrue(txm1.truncateInvalidTxBefore(timeBeforeTx3));
-
- // only tx5 and tx6 should be in invalid list now
- Assert.assertEquals(ImmutableList.of(tx5.getTransactionId(), tx6.getTransactionId()),
- txm1.getCurrentState().getInvalid());
-
- // removing invalid transactions before tx5 should throw exception since tx3 and tx4 are in-progress
- Assert.assertEquals(ImmutableSet.of(tx3.getTransactionId(), tx4.getTransactionId()),
- txm1.getCurrentState().getInProgress().keySet());
- try {
- txm1.truncateInvalidTxBefore(timeBeforeTx5);
- Assert.fail("Expected InvalidTruncateTimeException exception");
- } catch (InvalidTruncateTimeException e) {
- // Expected exception
- }
- // no change to in-progress
- Assert.assertEquals(ImmutableSet.of(tx3.getTransactionId(), tx4.getTransactionId()),
- txm1.getCurrentState().getInProgress().keySet());
- // no change to invalid list
- Assert.assertEquals(ImmutableList.of(tx5.getTransactionId(), tx6.getTransactionId()),
- txm1.getCurrentState().getInvalid());
-
- // Test transaction edit logs replay
- // Start another transaction manager without stopping txm1 so that snapshot does not get written,
- // and all logs can be replayed.
- txm2 = new TransactionManager(testConf, storage, new TxMetricsCollector());
- txm2.startAndWait();
- Assert.assertEquals(ImmutableList.of(tx5.getTransactionId(), tx6.getTransactionId()),
- txm2.getCurrentState().getInvalid());
- Assert.assertEquals(ImmutableSet.of(tx3.getTransactionId(), tx4.getTransactionId()),
- txm2.getCurrentState().getInProgress().keySet());
- } finally {
- txm1.stopAndWait();
- if (txm2 != null) {
- txm2.stopAndWait();
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/TransactionServiceMainTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/co/cask/tephra/TransactionServiceMainTest.java b/tephra-core/src/test/java/co/cask/tephra/TransactionServiceMainTest.java
deleted file mode 100644
index e54956d..0000000
--- a/tephra-core/src/test/java/co/cask/tephra/TransactionServiceMainTest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra;
-
-import co.cask.tephra.distributed.TransactionServiceClient;
-import com.google.common.base.Throwables;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.twill.internal.zookeeper.InMemoryZKServer;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.util.concurrent.CountDownLatch;
-
-/**
- * Test for verifying TransactionServiceMain works correctly.
- */
-public class TransactionServiceMainTest {
-
- @ClassRule
- public static TemporaryFolder tmpFolder = new TemporaryFolder();
-
- @Test
- public void testClientServer() throws Exception {
- // Simply start a transaction server and connect to it with the client.
- InMemoryZKServer zkServer = InMemoryZKServer.builder().setDataDir(tmpFolder.newFolder()).build();
- zkServer.startAndWait();
-
- try {
- Configuration conf = new Configuration();
- conf.set(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM, zkServer.getConnectionStr());
- conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath());
-
- final TransactionServiceMain main = new TransactionServiceMain(conf);
- final CountDownLatch latch = new CountDownLatch(1);
- Thread t = new Thread() {
- @Override
- public void run() {
- try {
- main.start();
- latch.countDown();
- } catch (Exception e) {
- throw Throwables.propagate(e);
- }
- }
- };
-
- try {
- t.start();
- // Wait for service to startup
- latch.await();
- TransactionServiceClient.doMain(true, conf);
- } finally {
- main.stop();
- t.join();
- }
- } finally {
- zkServer.stopAndWait();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/TransactionSystemTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/co/cask/tephra/TransactionSystemTest.java b/tephra-core/src/test/java/co/cask/tephra/TransactionSystemTest.java
deleted file mode 100644
index cca6f5a..0000000
--- a/tephra-core/src/test/java/co/cask/tephra/TransactionSystemTest.java
+++ /dev/null
@@ -1,320 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra;
-
-import co.cask.tephra.persist.TransactionSnapshot;
-import co.cask.tephra.persist.TransactionStateStorage;
-import com.google.common.collect.ImmutableSet;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.concurrent.TimeUnit;
-
-/**
- *
- */
-public abstract class TransactionSystemTest {
-
- public static final byte[] C1 = new byte[] { 'c', '1' };
- public static final byte[] C2 = new byte[] { 'c', '2' };
- public static final byte[] C3 = new byte[] { 'c', '3' };
- public static final byte[] C4 = new byte[] { 'c', '4' };
-
- protected abstract TransactionSystemClient getClient() throws Exception;
-
- protected abstract TransactionStateStorage getStateStorage() throws Exception;
-
- @Test
- public void testCommitRaceHandling() throws Exception {
- TransactionSystemClient client1 = getClient();
- TransactionSystemClient client2 = getClient();
-
- Transaction tx1 = client1.startShort();
- Transaction tx2 = client2.startShort();
-
- Assert.assertTrue(client1.canCommit(tx1, asList(C1, C2)));
- // second one also can commit even thought there are conflicts with first since first one hasn't committed yet
- Assert.assertTrue(client2.canCommit(tx2, asList(C2, C3)));
-
- Assert.assertTrue(client1.commit(tx1));
-
- // now second one should not commit, since there are conflicts with tx1 that has been committed
- Assert.assertFalse(client2.commit(tx2));
- }
-
- @Test
- public void testMultipleCommitsAtSameTime() throws Exception {
- // We want to check that if two txs finish at same time (wrt tx manager) they do not overwrite changesets of each
- // other in tx manager used for conflicts detection (we had this bug)
- // NOTE: you don't have to use multiple clients for that
- TransactionSystemClient client1 = getClient();
- TransactionSystemClient client2 = getClient();
- TransactionSystemClient client3 = getClient();
- TransactionSystemClient client4 = getClient();
- TransactionSystemClient client5 = getClient();
-
- Transaction tx1 = client1.startShort();
- Transaction tx2 = client2.startShort();
- Transaction tx3 = client3.startShort();
- Transaction tx4 = client4.startShort();
- Transaction tx5 = client5.startShort();
-
- Assert.assertTrue(client1.canCommit(tx1, asList(C1)));
- Assert.assertTrue(client1.commit(tx1));
-
- Assert.assertTrue(client2.canCommit(tx2, asList(C2)));
- Assert.assertTrue(client2.commit(tx2));
-
- // verifying conflicts detection
- Assert.assertFalse(client3.canCommit(tx3, asList(C1)));
- Assert.assertFalse(client4.canCommit(tx4, asList(C2)));
- Assert.assertTrue(client5.canCommit(tx5, asList(C3)));
- }
-
- @Test
- public void testCommitTwice() throws Exception {
- TransactionSystemClient client = getClient();
- Transaction tx = client.startShort();
-
- Assert.assertTrue(client.canCommit(tx, asList(C1, C2)));
- Assert.assertTrue(client.commit(tx));
- // cannot commit twice same tx
- try {
- Assert.assertFalse(client.commit(tx));
- Assert.fail();
- } catch (TransactionNotInProgressException e) {
- // expected
- }
- }
-
- @Test
- public void testAbortTwice() throws Exception {
- TransactionSystemClient client = getClient();
- Transaction tx = client.startShort();
-
- Assert.assertTrue(client.canCommit(tx, asList(C1, C2)));
- client.abort(tx);
- // abort of not active tx has no affect
- client.abort(tx);
- }
-
- @Test
- public void testReuseTx() throws Exception {
- TransactionSystemClient client = getClient();
- Transaction tx = client.startShort();
-
- Assert.assertTrue(client.canCommit(tx, asList(C1, C2)));
- Assert.assertTrue(client.commit(tx));
- // can't re-use same tx again
- try {
- client.canCommit(tx, asList(C3, C4));
- Assert.fail();
- } catch (TransactionNotInProgressException e) {
- // expected
- }
- try {
- Assert.assertFalse(client.commit(tx));
- Assert.fail();
- } catch (TransactionNotInProgressException e) {
- // expected
- }
-
- // abort of not active tx has no affect
- client.abort(tx);
- }
-
- @Test
- public void testUseNotStarted() throws Exception {
- TransactionSystemClient client = getClient();
- Transaction tx1 = client.startShort();
- Assert.assertTrue(client.commit(tx1));
-
- // we know this is one is older than current writePointer and was not used
- Transaction txOld = new Transaction(tx1.getReadPointer(), tx1.getTransactionId() - 1,
- new long[] {}, new long[] {}, Transaction.NO_TX_IN_PROGRESS,
- TransactionType.SHORT);
- try {
- Assert.assertFalse(client.canCommit(txOld, asList(C3, C4)));
- Assert.fail();
- } catch (TransactionNotInProgressException e) {
- // expected
- }
- try {
- Assert.assertFalse(client.commit(txOld));
- Assert.fail();
- } catch (TransactionNotInProgressException e) {
- // expected
- }
- // abort of not active tx has no affect
- client.abort(txOld);
-
- // we know this is one is newer than current readPointer and was not used
- Transaction txNew = new Transaction(tx1.getReadPointer(), tx1.getTransactionId() + 1,
- new long[] {}, new long[] {}, Transaction.NO_TX_IN_PROGRESS,
- TransactionType.SHORT);
- try {
- Assert.assertFalse(client.canCommit(txNew, asList(C3, C4)));
- Assert.fail();
- } catch (TransactionNotInProgressException e) {
- // expected
- }
- try {
- Assert.assertFalse(client.commit(txNew));
- Assert.fail();
- } catch (TransactionNotInProgressException e) {
- // expected
- }
- // abort of not active tx has no affect
- client.abort(txNew);
- }
-
- @Test
- public void testAbortAfterCommit() throws Exception {
- TransactionSystemClient client = getClient();
- Transaction tx = client.startShort();
-
- Assert.assertTrue(client.canCommit(tx, asList(C1, C2)));
- Assert.assertTrue(client.commit(tx));
- // abort of not active tx has no affect
- client.abort(tx);
- }
-
- // todo add test invalidate method
- @Test
- public void testInvalidateTx() throws Exception {
- TransactionSystemClient client = getClient();
- // Invalidate an in-progress tx
- Transaction tx1 = client.startShort();
- client.canCommit(tx1, asList(C1, C2));
- Assert.assertTrue(client.invalidate(tx1.getTransactionId()));
- // Cannot invalidate a committed tx
- Transaction tx2 = client.startShort();
- client.canCommit(tx2, asList(C3, C4));
- client.commit(tx2);
- Assert.assertFalse(client.invalidate(tx2.getTransactionId()));
- }
-
- @Test
- public void testResetState() throws Exception {
- // have tx in progress, committing and committed then reset,
- // get the last snapshot and see that it is empty
- TransactionSystemClient client = getClient();
- TransactionStateStorage stateStorage = getStateStorage();
-
- Transaction tx1 = client.startShort();
- Transaction tx2 = client.startShort();
- client.canCommit(tx1, asList(C1, C2));
- client.commit(tx1);
- client.canCommit(tx2, asList(C3, C4));
-
- Transaction txPreReset = client.startShort();
- long currentTs = System.currentTimeMillis();
- client.resetState();
-
- TransactionSnapshot snapshot = stateStorage.getLatestSnapshot();
- Assert.assertTrue(snapshot.getTimestamp() >= currentTs);
- Assert.assertEquals(0, snapshot.getInvalid().size());
- Assert.assertEquals(0, snapshot.getInProgress().size());
- Assert.assertEquals(0, snapshot.getCommittingChangeSets().size());
- Assert.assertEquals(0, snapshot.getCommittedChangeSets().size());
-
- // confirm that transaction IDs are not reset
- Transaction txPostReset = client.startShort();
- Assert.assertTrue("New tx ID should be greater than last ID before reset",
- txPostReset.getTransactionId() > txPreReset.getTransactionId());
- }
-
- @Test
- public void testTruncateInvalidTx() throws Exception {
- // Start few transactions and invalidate all of them
- TransactionSystemClient client = getClient();
- Transaction tx1 = client.startLong();
- Transaction tx2 = client.startShort();
- Transaction tx3 = client.startLong();
-
- client.invalidate(tx1.getTransactionId());
- client.invalidate(tx2.getTransactionId());
- client.invalidate(tx3.getTransactionId());
-
- // Remove tx2 and tx3 from invalid list
- Assert.assertTrue(client.truncateInvalidTx(ImmutableSet.of(tx2.getTransactionId(), tx3.getTransactionId())));
-
- Transaction tx = client.startShort();
- // Only tx1 should be in invalid list now
- Assert.assertArrayEquals(new long[] {tx1.getTransactionId()}, tx.getInvalids());
- client.abort(tx);
- }
-
- @Test
- public void testTruncateInvalidTxBefore() throws Exception {
- // Start few transactions
- TransactionSystemClient client = getClient();
- Transaction tx1 = client.startLong();
- Transaction tx2 = client.startShort();
- // Sleep so that transaction ids get generated a millisecond apart for assertion
- // TEPHRA-63 should eliminate the need to sleep
- TimeUnit.MILLISECONDS.sleep(1);
- long beforeTx3 = System.currentTimeMillis();
- Transaction tx3 = client.startLong();
-
- try {
- // throws exception since tx1 and tx2 are still in-progress
- client.truncateInvalidTxBefore(beforeTx3);
- Assert.fail("Expected InvalidTruncateTimeException exception");
- } catch (InvalidTruncateTimeException e) {
- // Expected exception
- }
-
- // Invalidate all of them
- client.invalidate(tx1.getTransactionId());
- client.invalidate(tx2.getTransactionId());
- client.invalidate(tx3.getTransactionId());
-
- // Remove transactions before time beforeTx3
- Assert.assertTrue(client.truncateInvalidTxBefore(beforeTx3));
-
- Transaction tx = client.startShort();
- // Only tx3 should be in invalid list now
- Assert.assertArrayEquals(new long[] {tx3.getTransactionId()}, tx.getInvalids());
- client.abort(tx);
- }
-
- @Test
- public void testGetInvalidSize() throws Exception {
- // Start few transactions and invalidate all of them
- TransactionSystemClient client = getClient();
- Transaction tx1 = client.startLong();
- Transaction tx2 = client.startShort();
- Transaction tx3 = client.startLong();
-
- Assert.assertEquals(0, client.getInvalidSize());
-
- client.invalidate(tx1.getTransactionId());
- client.invalidate(tx2.getTransactionId());
- client.invalidate(tx3.getTransactionId());
-
- Assert.assertEquals(3, client.getInvalidSize());
- }
-
- private Collection<byte[]> asList(byte[]... val) {
- return Arrays.asList(val);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/TransactionTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/co/cask/tephra/TransactionTest.java b/tephra-core/src/test/java/co/cask/tephra/TransactionTest.java
deleted file mode 100644
index b1c7e43..0000000
--- a/tephra-core/src/test/java/co/cask/tephra/TransactionTest.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra;
-
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.primitives.Longs;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.Set;
-
-/**
- *
- */
-public class TransactionTest {
- // Current transaction id
- private final long txId = 200L;
- // Read pointer for current transaction
- private final long readPointer = 100L;
- // Transactions committed before current transaction was started.
- private final Set<Long> priorCommitted = ImmutableSet.of(80L, 99L, 100L);
- // Transactions committed after current transaction was started.
- private final Set<Long> postCommitted = ImmutableSet.of(150L, 180L, 210L);
- // Invalid transactions before current transaction was started.
- private final Set<Long> priorInvalids = ImmutableSet.of(90L, 110L, 190L);
- // Invalid transactions after current transaction was started.
- private final Set<Long> postInvalids = ImmutableSet.of(201L, 221L, 231L);
- // Transactions in progress before current transaction was started.
- private final Set<Long> priorInProgress = ImmutableSet.of(95L, 120L, 150L);
- // Transactions in progress after current transaction was started.
- private final Set<Long> postInProgress = ImmutableSet.of(205L, 215L, 300L);
-
- @Test
- public void testSnapshotVisibility() throws Exception {
- Transaction.VisibilityLevel visibilityLevel = Transaction.VisibilityLevel.SNAPSHOT;
-
- Set<Long> checkPointers = ImmutableSet.of(220L, 250L);
- Transaction tx = new Transaction(readPointer, txId, 250L, toSortedArray(priorInvalids),
- toSortedArray(priorInProgress), 95L,
- TransactionType.SHORT, toSortedArray(checkPointers),
- visibilityLevel);
- Set<Long> visibleCurrent = ImmutableSet.of(200L, 220L, 250L);
- Set<Long> notVisibleCurrent = ImmutableSet.of();
- assertVisibility(priorCommitted, postCommitted, priorInvalids, postInvalids, priorInProgress, postInProgress,
- visibleCurrent, notVisibleCurrent, tx);
-
- checkPointers = ImmutableSet.of();
- tx = new Transaction(readPointer, txId, txId, toSortedArray(priorInvalids), toSortedArray(priorInProgress), 95L,
- TransactionType.SHORT, toSortedArray(checkPointers),
- visibilityLevel);
- visibleCurrent = ImmutableSet.of(txId);
- notVisibleCurrent = ImmutableSet.of();
- assertVisibility(priorCommitted, postCommitted, priorInvalids, postInvalids, priorInProgress, postInProgress,
- visibleCurrent, notVisibleCurrent, tx);
- }
-
- @Test
- public void testSnapshotExcludeVisibility() throws Exception {
- Transaction.VisibilityLevel visibilityLevel = Transaction.VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT;
-
- Set<Long> checkPointers = ImmutableSet.of(220L, 250L);
- Transaction tx = new Transaction(readPointer, txId, 250L, toSortedArray(priorInvalids),
- toSortedArray(priorInProgress), 95L,
- TransactionType.SHORT, toSortedArray(checkPointers),
- visibilityLevel);
- Set<Long> visibleCurrent = ImmutableSet.of(200L, 220L);
- Set<Long> notVisibleCurrent = ImmutableSet.of(250L);
- assertVisibility(priorCommitted, postCommitted, priorInvalids, postInvalids, priorInProgress, postInProgress,
- visibleCurrent, notVisibleCurrent, tx);
-
- checkPointers = ImmutableSet.of();
- tx = new Transaction(readPointer, txId, txId, toSortedArray(priorInvalids), toSortedArray(priorInProgress), 95L,
- TransactionType.SHORT, toSortedArray(checkPointers),
- visibilityLevel);
- visibleCurrent = ImmutableSet.of();
- notVisibleCurrent = ImmutableSet.of(txId);
- assertVisibility(priorCommitted, postCommitted, priorInvalids, postInvalids, priorInProgress, postInProgress,
- visibleCurrent, notVisibleCurrent, tx);
- }
-
- @Test
- public void testSnapshotAllVisibility() throws Exception {
- Transaction.VisibilityLevel visibilityLevel = Transaction.VisibilityLevel.SNAPSHOT_ALL;
-
- Set<Long> checkPointers = ImmutableSet.of(220L, 250L);
- Transaction tx = new Transaction(readPointer, txId, 250L, toSortedArray(priorInvalids),
- toSortedArray(priorInProgress), 95L,
- TransactionType.SHORT, toSortedArray(checkPointers),
- visibilityLevel);
- Set<Long> visibleCurrent = ImmutableSet.of(200L, 220L, 250L);
- Set<Long> notVisibleCurrent = ImmutableSet.of();
- assertVisibility(priorCommitted, postCommitted, priorInvalids, postInvalids, priorInProgress, postInProgress,
- visibleCurrent, notVisibleCurrent, tx);
-
- checkPointers = ImmutableSet.of();
- tx = new Transaction(readPointer, txId, txId, toSortedArray(priorInvalids),
- toSortedArray(priorInProgress), 95L,
- TransactionType.SHORT, toSortedArray(checkPointers),
- visibilityLevel);
- visibleCurrent = ImmutableSet.of(txId);
- notVisibleCurrent = ImmutableSet.of();
- assertVisibility(priorCommitted, postCommitted, priorInvalids, postInvalids, priorInProgress, postInProgress,
- visibleCurrent, notVisibleCurrent, tx);
- }
-
- private void assertVisibility(Set<Long> priorCommitted, Set<Long> postCommitted, Set<Long> priorInvalids,
- Set<Long> postInvalids, Set<Long> priorInProgress, Set<Long> postInProgress,
- Set<Long> visibleCurrent, Set<Long> notVisibleCurrent,
- Transaction tx) {
- // Verify visible snapshots of tx are visible
- for (long t : visibleCurrent) {
- Assert.assertTrue("Assertion error for version = " + t, tx.isVisible(t));
- }
-
- // Verify not visible snapshots of tx are not visible
- for (long t : notVisibleCurrent) {
- Assert.assertFalse("Assertion error for version = " + t, tx.isVisible(t));
- }
-
- // Verify prior committed versions are visible
- for (long t : priorCommitted) {
- Assert.assertTrue("Assertion error for version = " + t, tx.isVisible(t));
- }
-
- // Verify versions committed after tx started, and not part of tx are not visible
- for (long t : postCommitted) {
- Assert.assertFalse("Assertion error for version = " + t, tx.isVisible(t));
- }
-
- // Verify invalid and in-progress versions are not visible
- for (long t : Iterables.concat(priorInvalids, postInvalids, priorInProgress, postInProgress)) {
- Assert.assertFalse("Assertion error for version = " + t, tx.isVisible(t));
- }
- }
-
- private long[] toSortedArray(Set<Long> set) {
- long[] array = Longs.toArray(set);
- Arrays.sort(array);
- return array;
- }
-}