You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by po...@apache.org on 2016/05/11 20:15:28 UTC
[28/56] [abbrv] [partial] incubator-tephra git commit: Rename package
to org.apache.tephra
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/persist/TransactionEditTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/co/cask/tephra/persist/TransactionEditTest.java b/tephra-core/src/test/java/co/cask/tephra/persist/TransactionEditTest.java
deleted file mode 100644
index 4a3af3c..0000000
--- a/tephra-core/src/test/java/co/cask/tephra/persist/TransactionEditTest.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.persist;
-
-import co.cask.tephra.ChangeId;
-import co.cask.tephra.TransactionType;
-import com.google.common.collect.Sets;
-import com.google.common.io.ByteArrayDataOutput;
-import com.google.common.io.ByteStreams;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.DataInput;
-import java.io.IOException;
-
-/**
- * test for {@link TransactionEdit}
- */
-public class TransactionEditTest {
- private static final byte[] COL = new byte[] {'c'};
-
- @Test
- public void testV1SerdeCompat() throws Exception {
- TransactionEditCodecs.TransactionEditCodec olderCodec = new TransactionEditCodecs.TransactionEditCodecV1();
- // start tx edit and committed tx edit cover all fields of tx edit
- // NOTE: set visibilityUpperBound to 0 and transaction type to null as this is expected default
- // for decoding older versions that doesn't store it
- verifyDecodingSupportsOlderVersion(TransactionEdit.createStarted(2L, 0L, 1000L, null), olderCodec);
- verifyDecodingSupportsOlderVersion(
- TransactionEdit.createCommitted(2L, Sets.newHashSet(new ChangeId(COL)), 3L, true), olderCodec);
- }
-
- @Test
- public void testV2SerdeCompat() throws Exception {
- TransactionEditCodecs.TransactionEditCodec olderCodec = new TransactionEditCodecs.TransactionEditCodecV2();
- // start tx edit and committed tx edit cover all fields of tx edit
- // NOTE: transaction type to null as this is expected default for decoding older versions that doesn't store it
- verifyDecodingSupportsOlderVersion(TransactionEdit.createStarted(2L, 100L, 1000L, null), olderCodec);
- verifyDecodingSupportsOlderVersion(
- TransactionEdit.createCommitted(2L, Sets.newHashSet(new ChangeId(COL)), 3L, true), olderCodec);
- }
-
- @SuppressWarnings("deprecation")
- private void verifyDecodingSupportsOlderVersion(TransactionEdit edit,
- TransactionEditCodecs.TransactionEditCodec olderCodec)
- throws IOException {
- // encoding with older version of codec
- ByteArrayDataOutput out = ByteStreams.newDataOutput();
- TransactionEditCodecs.encode(edit, out, olderCodec);
-
- // decoding
- TransactionEdit decodedEdit = new TransactionEdit();
- DataInput in = ByteStreams.newDataInput(out.toByteArray());
- decodedEdit.readFields(in);
-
- Assert.assertEquals(edit, decodedEdit);
- }
-
- @Test
- public void testSerialization() throws Exception {
- assertSerializedEdit(TransactionEdit.createAborted(1L, TransactionType.SHORT, new long[0]));
- assertSerializedEdit(TransactionEdit.createAborted(1L, TransactionType.SHORT, new long[]{ 2L, 3L }));
- assertSerializedEdit(TransactionEdit.createAborted(1L, TransactionType.LONG, new long[0]));
- assertSerializedEdit(TransactionEdit.createAborted(1L, TransactionType.LONG, new long[]{ 2L, 3L }));
-
- assertSerializedEdit(TransactionEdit.createCheckpoint(2L, 1L));
-
- assertSerializedEdit(TransactionEdit.createCommitted(1L, Sets.<ChangeId>newHashSet(), 2L, false));
- assertSerializedEdit(TransactionEdit.createCommitted(1L, Sets.<ChangeId>newHashSet(), 2L, true));
- assertSerializedEdit(TransactionEdit.createCommitted(1L,
- Sets.newHashSet(new ChangeId(new byte[]{'a', 'b', 'c'})), 2L, false));
- assertSerializedEdit(TransactionEdit.createCommitted(1L,
- Sets.newHashSet(new ChangeId(new byte[]{ 'a', 'b', 'c' }), new ChangeId(new byte[]{ 'd', 'e', 'f' })),
- 2L, true));
-
- assertSerializedEdit(TransactionEdit.createCommitting(1L, Sets.<ChangeId>newHashSet()));
- assertSerializedEdit(TransactionEdit.createCommitting(1L,
- Sets.newHashSet(new ChangeId(new byte[]{'a', 'b', 'c'}))));
- assertSerializedEdit(TransactionEdit.createCommitting(1L,
- Sets.newHashSet(new ChangeId(new byte[]{'a', 'b', 'c'}), new ChangeId(new byte[]{'d', 'e', 'f'}))));
-
- assertSerializedEdit(TransactionEdit.createInvalid(1L));
-
- assertSerializedEdit(TransactionEdit.createMoveWatermark(10L));
-
- assertSerializedEdit(TransactionEdit.createStarted(2L, 1L, System.currentTimeMillis() + 1000,
- TransactionType.SHORT));
- assertSerializedEdit(TransactionEdit.createStarted(2L, 1L, System.currentTimeMillis() + 10000,
- TransactionType.LONG));
-
- assertSerializedEdit(TransactionEdit.createTruncateInvalidTx(Sets.newHashSet(new Long(1))));
- assertSerializedEdit(TransactionEdit.createTruncateInvalidTx(
- Sets.newHashSet(new Long(1), new Long(2), new Long(3))));
-
- assertSerializedEdit(TransactionEdit.createTruncateInvalidTxBefore(System.currentTimeMillis()));
- }
-
- private void assertSerializedEdit(TransactionEdit originalEdit) throws IOException {
- ByteArrayDataOutput out = ByteStreams.newDataOutput();
- originalEdit.write(out);
-
- TransactionEdit decodedEdit = new TransactionEdit();
- DataInput in = ByteStreams.newDataInput(out.toByteArray());
- decodedEdit.readFields(in);
-
- Assert.assertEquals(originalEdit, decodedEdit);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/snapshot/SnapshotCodecTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/co/cask/tephra/snapshot/SnapshotCodecTest.java b/tephra-core/src/test/java/co/cask/tephra/snapshot/SnapshotCodecTest.java
deleted file mode 100644
index f3fe2e1..0000000
--- a/tephra-core/src/test/java/co/cask/tephra/snapshot/SnapshotCodecTest.java
+++ /dev/null
@@ -1,368 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.snapshot;
-
-import co.cask.tephra.ChangeId;
-import co.cask.tephra.Transaction;
-import co.cask.tephra.TransactionManager;
-import co.cask.tephra.TransactionNotInProgressException;
-import co.cask.tephra.TransactionType;
-import co.cask.tephra.TxConstants;
-import co.cask.tephra.persist.TransactionSnapshot;
-import co.cask.tephra.persist.TransactionStateStorage;
-import co.cask.tephra.persist.TransactionVisibilityState;
-import co.cask.tephra.runtime.ConfigModule;
-import co.cask.tephra.runtime.DiscoveryModules;
-import co.cask.tephra.runtime.TransactionModules;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSortedMap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-import org.apache.hadoop.conf.Configuration;
-import org.junit.Assert;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Tests related to {@link SnapshotCodec} implementations.
- */
-public class SnapshotCodecTest {
- @ClassRule
- public static TemporaryFolder tmpDir = new TemporaryFolder();
-
- @Test
- public void testMinimalDeserilization() throws Exception {
- long now = System.currentTimeMillis();
- long nowWritePointer = now * TxConstants.MAX_TX_PER_MS;
- /*
- * Snapshot consisting of transactions at:
- */
- long tInvalid = nowWritePointer - 5; // t1 - invalid
- long readPtr = nowWritePointer - 4; // t2 - here and earlier committed
- long tLong = nowWritePointer - 3; // t3 - in-progress LONG
- long tCommitted = nowWritePointer - 2; // t4 - committed, changeset (r1, r2)
- long tShort = nowWritePointer - 1; // t5 - in-progress SHORT, canCommit called, changeset (r3, r4)
-
- TreeMap<Long, TransactionManager.InProgressTx> inProgress = Maps.newTreeMap(ImmutableSortedMap.of(
- tLong, new TransactionManager.InProgressTx(readPtr,
- TransactionManager.getTxExpirationFromWritePointer(
- tLong, TxConstants.Manager.DEFAULT_TX_LONG_TIMEOUT),
- TransactionType.LONG),
- tShort, new TransactionManager.InProgressTx(readPtr, now + 1000, TransactionType.SHORT)));
-
- TransactionSnapshot snapshot = new TransactionSnapshot(now, readPtr, nowWritePointer,
- Lists.newArrayList(tInvalid), // invalid
- inProgress, ImmutableMap.<Long, Set<ChangeId>>of(
- tShort, Sets.<ChangeId>newHashSet()),
- ImmutableMap.<Long, Set<ChangeId>>of(
- tCommitted, Sets.<ChangeId>newHashSet()));
-
- Configuration conf1 = new Configuration();
- conf1.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, SnapshotCodecV4.class.getName());
- SnapshotCodecProvider provider1 = new SnapshotCodecProvider(conf1);
-
- byte[] byteArray;
- try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
- provider1.encode(out, snapshot);
- byteArray = out.toByteArray();
- }
-
- // TransactionSnapshot and TransactionVisibilityState decode should pass now
- TransactionSnapshot txSnapshot = provider1.decode(new ByteArrayInputStream(byteArray));
- TransactionVisibilityState txVisibilityState =
- provider1.decodeTransactionVisibilityState(new ByteArrayInputStream(byteArray));
- assertTransactionVisibilityStateEquals(txSnapshot, txVisibilityState);
-
- // Corrupt the serialization byte array so that full deserialization will fail
- byteArray[byteArray.length - 1] = 'a';
-
- // TransactionVisibilityState decoding should pass since it doesn't decode the committing and committed changesets.
- TransactionVisibilityState txVisibilityState2 = provider1.decodeTransactionVisibilityState(
- new ByteArrayInputStream(byteArray));
- Assert.assertNotNull(txVisibilityState2);
- Assert.assertEquals(txVisibilityState, txVisibilityState2);
- Assert.assertEquals(readPtr, txVisibilityState2.getReadPointer());
- try {
- provider1.decode(new ByteArrayInputStream(byteArray));
- Assert.fail();
- } catch (RuntimeException e) {
- // expected since we modified the serialization bytes
- }
- }
-
- /**
- * In-progress LONG transactions written with DefaultSnapshotCodec will not have the type serialized as part of
- * the data. Since these transactions also contain a non-negative expiration, we need to ensure we reset the type
- * correctly when the snapshot is loaded.
- */
- @Test
- public void testDefaultToV3Compatibility() throws Exception {
- long now = System.currentTimeMillis();
- long nowWritePointer = now * TxConstants.MAX_TX_PER_MS;
- /*
- * Snapshot consisting of transactions at:
- */
- long tInvalid = nowWritePointer - 5; // t1 - invalid
- long readPtr = nowWritePointer - 4; // t2 - here and earlier committed
- long tLong = nowWritePointer - 3; // t3 - in-progress LONG
- long tCommitted = nowWritePointer - 2; // t4 - committed, changeset (r1, r2)
- long tShort = nowWritePointer - 1; // t5 - in-progress SHORT, canCommit called, changeset (r3, r4)
-
- TreeMap<Long, TransactionManager.InProgressTx> inProgress = Maps.newTreeMap(ImmutableSortedMap.of(
- tLong, new TransactionManager.InProgressTx(readPtr,
- TransactionManager.getTxExpirationFromWritePointer(tLong, TxConstants.Manager.DEFAULT_TX_LONG_TIMEOUT),
- TransactionType.LONG),
- tShort, new TransactionManager.InProgressTx(readPtr, now + 1000, TransactionType.SHORT)));
-
- TransactionSnapshot snapshot = new TransactionSnapshot(now, readPtr, nowWritePointer,
- Lists.newArrayList(tInvalid), // invalid
- inProgress,
- ImmutableMap.<Long, Set<ChangeId>>of(
- tShort, Sets.newHashSet(new ChangeId(new byte[]{'r', '3'}), new ChangeId(new byte[]{'r', '4'}))),
- ImmutableMap.<Long, Set<ChangeId>>of(
- tCommitted, Sets.newHashSet(new ChangeId(new byte[]{'r', '1'}), new ChangeId(new byte[]{'r', '2'}))));
-
- Configuration conf1 = new Configuration();
- conf1.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, DefaultSnapshotCodec.class.getName());
- SnapshotCodecProvider provider1 = new SnapshotCodecProvider(conf1);
-
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- try {
- provider1.encode(out, snapshot);
- } finally {
- out.close();
- }
-
- TransactionSnapshot snapshot2 = provider1.decode(new ByteArrayInputStream(out.toByteArray()));
- TransactionVisibilityState minTxSnapshot = provider1.decodeTransactionVisibilityState(
- new ByteArrayInputStream(out.toByteArray()));
- assertTransactionVisibilityStateEquals(snapshot2, minTxSnapshot);
-
- assertEquals(snapshot.getReadPointer(), snapshot2.getReadPointer());
- assertEquals(snapshot.getWritePointer(), snapshot2.getWritePointer());
- assertEquals(snapshot.getInvalid(), snapshot2.getInvalid());
- // in-progress transactions will have missing types
- assertNotEquals(snapshot.getInProgress(), snapshot2.getInProgress());
- assertEquals(snapshot.getCommittingChangeSets(), snapshot2.getCommittingChangeSets());
- assertEquals(snapshot.getCommittedChangeSets(), snapshot2.getCommittedChangeSets());
-
- // after fixing in-progress, full snapshot should match
- Map<Long, TransactionManager.InProgressTx> fixedInProgress = TransactionManager.txnBackwardsCompatCheck(
- TxConstants.Manager.DEFAULT_TX_LONG_TIMEOUT, 10000L, snapshot2.getInProgress());
- assertEquals(snapshot.getInProgress(), fixedInProgress);
- assertEquals(snapshot, snapshot2);
- }
-
- /**
- * Test full stack serialization for a TransactionManager migrating from DefaultSnapshotCodec to SnapshotCodecV3.
- */
- @Test
- public void testDefaultToV3Migration() throws Exception {
- File testDir = tmpDir.newFolder("testDefaultToV3Migration");
- Configuration conf = new Configuration();
- conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, DefaultSnapshotCodec.class.getName());
- conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_LOCAL_DIR, testDir.getAbsolutePath());
-
- Injector injector = Guice.createInjector(new ConfigModule(conf),
- new DiscoveryModules().getSingleNodeModules(), new TransactionModules().getSingleNodeModules());
-
- TransactionManager txManager = injector.getInstance(TransactionManager.class);
- txManager.startAndWait();
-
- txManager.startLong();
-
- // shutdown to force a snapshot
- txManager.stopAndWait();
-
- TransactionStateStorage txStorage = injector.getInstance(TransactionStateStorage.class);
- txStorage.startAndWait();
-
- // confirm that the in-progress entry is missing a type
- TransactionSnapshot snapshot = txStorage.getLatestSnapshot();
- TransactionVisibilityState txVisibilityState = txStorage.getLatestTransactionVisibilityState();
- assertTransactionVisibilityStateEquals(snapshot, txVisibilityState);
- assertNotNull(snapshot);
- assertEquals(1, snapshot.getInProgress().size());
- Map.Entry<Long, TransactionManager.InProgressTx> entry =
- snapshot.getInProgress().entrySet().iterator().next();
- assertNull(entry.getValue().getType());
- txStorage.stopAndWait();
-
-
- // start a new Tx manager to test fixup
- Configuration conf2 = new Configuration();
- conf2.set(TxConstants.Manager.CFG_TX_SNAPSHOT_LOCAL_DIR, testDir.getAbsolutePath());
- conf2.setStrings(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES,
- DefaultSnapshotCodec.class.getName(), SnapshotCodecV3.class.getName());
- Injector injector2 = Guice.createInjector(new ConfigModule(conf2),
- new DiscoveryModules().getSingleNodeModules(), new TransactionModules().getSingleNodeModules());
-
- TransactionManager txManager2 = injector2.getInstance(TransactionManager.class);
- txManager2.startAndWait();
-
- // state should be recovered
- TransactionSnapshot snapshot2 = txManager2.getCurrentState();
- assertEquals(1, snapshot2.getInProgress().size());
- Map.Entry<Long, TransactionManager.InProgressTx> inProgressTx =
- snapshot2.getInProgress().entrySet().iterator().next();
- assertEquals(TransactionType.LONG, inProgressTx.getValue().getType());
-
- // save a new snapshot
- txManager2.stopAndWait();
-
- TransactionStateStorage txStorage2 = injector2.getInstance(TransactionStateStorage.class);
- txStorage2.startAndWait();
-
- TransactionSnapshot snapshot3 = txStorage2.getLatestSnapshot();
- // full snapshot should have deserialized correctly without any fixups
- assertEquals(snapshot2.getInProgress(), snapshot3.getInProgress());
- assertEquals(snapshot2, snapshot3);
- txStorage2.stopAndWait();
- }
-
- @Test
- public void testSnapshotCodecProviderConfiguration() throws Exception {
- Configuration conf = new Configuration(false);
- StringBuilder buf = new StringBuilder();
- for (Class c : TxConstants.Persist.DEFAULT_TX_SNAPHOT_CODEC_CLASSES) {
- if (buf.length() > 0) {
- buf.append(",\n ");
- }
- buf.append(c.getName());
- }
- conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, buf.toString());
-
- SnapshotCodecProvider codecProvider = new SnapshotCodecProvider(conf);
- SnapshotCodec v1codec = codecProvider.getCodecForVersion(new DefaultSnapshotCodec().getVersion());
- assertNotNull(v1codec);
- assertTrue(v1codec instanceof DefaultSnapshotCodec);
-
- SnapshotCodec v2codec = codecProvider.getCodecForVersion(new SnapshotCodecV2().getVersion());
- assertNotNull(v2codec);
- assertTrue(v2codec instanceof SnapshotCodecV2);
-
- SnapshotCodec v3codec = codecProvider.getCodecForVersion(new SnapshotCodecV3().getVersion());
- assertNotNull(v3codec);
- assertTrue(v3codec instanceof SnapshotCodecV3);
-
- SnapshotCodec v4codec = codecProvider.getCodecForVersion(new SnapshotCodecV4().getVersion());
- assertNotNull(v4codec);
- assertTrue(v4codec instanceof SnapshotCodecV4);
- }
-
- @Test
- public void testSnapshotCodecV4() throws IOException, TransactionNotInProgressException {
- File testDir = tmpDir.newFolder("testSnapshotCodecV4");
- Configuration conf = new Configuration();
- conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, SnapshotCodecV4.class.getName());
- conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_LOCAL_DIR, testDir.getAbsolutePath());
-
- Injector injector = Guice.createInjector(new ConfigModule(conf),
- new DiscoveryModules().getSingleNodeModules(),
- new TransactionModules().getSingleNodeModules());
-
- TransactionManager txManager = injector.getInstance(TransactionManager.class);
- txManager.startAndWait();
-
- // Create a transaction and a checkpoint transaction
- Transaction transaction = txManager.startLong();
- Transaction checkpointTx = txManager.checkpoint(transaction);
-
- // shutdown to force a snapshot
- txManager.stopAndWait();
-
- // Validate the snapshot on disk
- TransactionStateStorage txStorage = injector.getInstance(TransactionStateStorage.class);
- txStorage.startAndWait();
-
- TransactionSnapshot snapshot = txStorage.getLatestSnapshot();
- TransactionVisibilityState txVisibilityState = txStorage.getLatestTransactionVisibilityState();
- assertTransactionVisibilityStateEquals(snapshot, txVisibilityState);
-
- Map<Long, TransactionManager.InProgressTx> inProgress = snapshot.getInProgress();
- Assert.assertEquals(1, inProgress.size());
-
- TransactionManager.InProgressTx inProgressTx = inProgress.get(transaction.getTransactionId());
- Assert.assertNotNull(inProgressTx);
- Assert.assertArrayEquals(checkpointTx.getCheckpointWritePointers(),
- inProgressTx.getCheckpointWritePointers().toLongArray());
-
- txStorage.stopAndWait();
-
- // start a new Tx manager to see if the transaction is restored correctly.
- Injector injector2 = Guice.createInjector(new ConfigModule(conf),
- new DiscoveryModules().getSingleNodeModules(),
- new TransactionModules().getSingleNodeModules());
-
- txManager = injector2.getInstance(TransactionManager.class);
- txManager.startAndWait();
-
- // state should be recovered
- snapshot = txManager.getCurrentState();
- inProgress = snapshot.getInProgress();
- Assert.assertEquals(1, inProgress.size());
-
- inProgressTx = inProgress.get(transaction.getTransactionId());
- Assert.assertNotNull(inProgressTx);
- Assert.assertArrayEquals(checkpointTx.getCheckpointWritePointers(),
- inProgressTx.getCheckpointWritePointers().toLongArray());
-
- // Should be able to commit the transaction
- Assert.assertTrue(txManager.canCommit(checkpointTx, Collections.<byte[]>emptyList()));
- Assert.assertTrue(txManager.commit(checkpointTx));
-
- // save a new snapshot
- txManager.stopAndWait();
-
- TransactionStateStorage txStorage2 = injector2.getInstance(TransactionStateStorage.class);
- txStorage2.startAndWait();
-
- snapshot = txStorage2.getLatestSnapshot();
- Assert.assertTrue(snapshot.getInProgress().isEmpty());
- txStorage2.stopAndWait();
- }
-
- private void assertTransactionVisibilityStateEquals(TransactionVisibilityState expected,
- TransactionVisibilityState input) {
- Assert.assertEquals(expected.getTimestamp(), input.getTimestamp());
- Assert.assertEquals(expected.getReadPointer(), input.getReadPointer());
- Assert.assertEquals(expected.getWritePointer(), input.getWritePointer());
- Assert.assertEquals(expected.getInProgress(), input.getInProgress());
- Assert.assertEquals(expected.getInvalid(), input.getInvalid());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/util/AbstractConfigurationProviderTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/co/cask/tephra/util/AbstractConfigurationProviderTest.java b/tephra-core/src/test/java/co/cask/tephra/util/AbstractConfigurationProviderTest.java
deleted file mode 100644
index 0ae9ed4..0000000
--- a/tephra-core/src/test/java/co/cask/tephra/util/AbstractConfigurationProviderTest.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.util;
-
-import org.apache.hadoop.conf.Configuration;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-/**
- *
- */
-public abstract class AbstractConfigurationProviderTest {
- @Test
- public void testVersionFactory() {
- HBaseVersion.Version foundVersion = HBaseVersion.get();
- assertEquals(getExpectedVersion(), foundVersion);
- }
-
- protected abstract HBaseVersion.Version getExpectedVersion();
-
- @Test
- public void testConfigurationProvider() {
- Configuration conf = new Configuration();
- conf.set("foo", "bar");
- Configuration newConf = new ConfigurationFactory().get(conf);
- assertNotNull(newConf);
- assertEquals("bar", newConf.get("foo"));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/util/HBaseVersionTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/co/cask/tephra/util/HBaseVersionTest.java b/tephra-core/src/test/java/co/cask/tephra/util/HBaseVersionTest.java
deleted file mode 100644
index 1bb1fe2..0000000
--- a/tephra-core/src/test/java/co/cask/tephra/util/HBaseVersionTest.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.util;
-
-import org.junit.Test;
-
-import java.text.ParseException;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
-
-/**
- * Tests for HBase version parsing.
- */
-public class HBaseVersionTest {
- @Test
- public void testVersionNumber() throws Exception {
- HBaseVersion.VersionNumber ver = HBaseVersion.VersionNumber.create("1");
- assertVersionNumber(ver, 1, null, null, null, false);
-
- ver = HBaseVersion.VersionNumber.create("1-SNAPSHOT");
- assertVersionNumber(ver, 1, null, null, null, true);
-
- ver = HBaseVersion.VersionNumber.create("1-foo");
- assertVersionNumber(ver, 1, null, null, "foo", false);
-
- ver = HBaseVersion.VersionNumber.create("1-foo-SNAPSHOT");
- assertVersionNumber(ver, 1, null, null, "foo", true);
-
- ver = HBaseVersion.VersionNumber.create("10.0");
- assertVersionNumber(ver, 10, 0, null, null, false);
-
- ver = HBaseVersion.VersionNumber.create("10.0-bar");
- assertVersionNumber(ver, 10, 0, null, "bar", false);
-
- ver = HBaseVersion.VersionNumber.create("3.2.1");
- assertVersionNumber(ver, 3, 2, 1, null, false);
-
- ver = HBaseVersion.VersionNumber.create("3.2.1-SNAPSHOT");
- assertVersionNumber(ver, 3, 2, 1, null, true);
-
- ver = HBaseVersion.VersionNumber.create("3.2.1-baz");
- assertVersionNumber(ver, 3, 2, 1, "baz", false);
-
- ver = HBaseVersion.VersionNumber.create("3.2.1-baz1.2.3");
- assertVersionNumber(ver, 3, 2, 1, "baz1.2.3", false);
-
- ver = HBaseVersion.VersionNumber.create("3.2.1-baz1.2.3-SNAPSHOT");
- assertVersionNumber(ver, 3, 2, 1, "baz1.2.3", true);
-
- try {
- ver = HBaseVersion.VersionNumber.create("abc");
- fail("Invalid verison number 'abc' should have thrown a ParseException");
- } catch (ParseException pe) {
- // expected
- }
-
- try {
- ver = HBaseVersion.VersionNumber.create("1.a.b");
- fail("Invalid verison number '1.a.b' should have thrown a ParseException");
- } catch (ParseException pe) {
- // expected
- }
-
- ver = HBaseVersion.VersionNumber.create("1.2.0-CDH5.7.0");
- assertVersionNumber(ver, 1, 2, 0, "CDH5.7.0", false);
- }
-
- private void assertVersionNumber(HBaseVersion.VersionNumber version, Integer expectedMajor, Integer expectedMinor,
- Integer expectedPatch, String expectedClassifier, boolean snapshot) {
- if (expectedMajor == null) {
- assertNull(version.getMajor());
- } else {
- assertEquals(expectedMajor, version.getMajor());
- }
- if (expectedMinor == null) {
- assertNull(version.getMinor());
- } else {
- assertEquals(expectedMinor, version.getMinor());
- }
- if (expectedPatch == null) {
- assertNull(version.getPatch());
- } else {
- assertEquals(expectedPatch, version.getPatch());
- }
- if (expectedClassifier == null) {
- assertNull(version.getClassifier());
- } else {
- assertEquals(expectedClassifier, version.getClassifier());
- }
- assertEquals(snapshot, version.isSnapshot());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/util/TransactionEditUtil.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/co/cask/tephra/util/TransactionEditUtil.java b/tephra-core/src/test/java/co/cask/tephra/util/TransactionEditUtil.java
deleted file mode 100644
index c3269b0..0000000
--- a/tephra-core/src/test/java/co/cask/tephra/util/TransactionEditUtil.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.util;
-
-import co.cask.tephra.ChangeId;
-import co.cask.tephra.TransactionType;
-import co.cask.tephra.persist.TransactionEdit;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-import java.util.List;
-import java.util.Random;
-import java.util.Set;
-
-/**
- * Util class for {@link TransactionEdit} related tests.
- */
-public final class TransactionEditUtil {
- private static Random random = new Random();
-
- /**
- * Generates a number of semi-random {@link TransactionEdit} instances.
- * These are just randomly selected from the possible states, so would not necessarily reflect a real-world
- * distribution.
- *
- * @param numEntries how many entries to generate in the returned list.
- * @return a list of randomly generated transaction log edits.
- */
- public static List<TransactionEdit> createRandomEdits(int numEntries) {
- List<TransactionEdit> edits = Lists.newArrayListWithCapacity(numEntries);
- for (int i = 0; i < numEntries; i++) {
- TransactionEdit.State nextType = TransactionEdit.State.values()[random.nextInt(6)];
- long writePointer = Math.abs(random.nextLong());
- switch (nextType) {
- case INPROGRESS:
- edits.add(
- TransactionEdit.createStarted(writePointer, writePointer - 1,
- System.currentTimeMillis() + 300000L, TransactionType.SHORT));
- break;
- case COMMITTING:
- edits.add(TransactionEdit.createCommitting(writePointer, generateChangeSet(10)));
- break;
- case COMMITTED:
- edits.add(TransactionEdit.createCommitted(writePointer, generateChangeSet(10), writePointer + 1,
- random.nextBoolean()));
- break;
- case INVALID:
- edits.add(TransactionEdit.createInvalid(writePointer));
- break;
- case ABORTED:
- edits.add(TransactionEdit.createAborted(writePointer, TransactionType.SHORT, null));
- break;
- case MOVE_WATERMARK:
- edits.add(TransactionEdit.createMoveWatermark(writePointer));
- break;
- }
- }
- return edits;
- }
-
- private static Set<ChangeId> generateChangeSet(int numEntries) {
- Set<ChangeId> changes = Sets.newHashSet();
- for (int i = 0; i < numEntries; i++) {
- byte[] bytes = new byte[8];
- random.nextBytes(bytes);
- changes.add(new ChangeId(bytes));
- }
- return changes;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/util/TxUtilsTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/co/cask/tephra/util/TxUtilsTest.java b/tephra-core/src/test/java/co/cask/tephra/util/TxUtilsTest.java
deleted file mode 100644
index b8188e0..0000000
--- a/tephra-core/src/test/java/co/cask/tephra/util/TxUtilsTest.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.util;
-
-import co.cask.tephra.Transaction;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Test cases for {@link TxUtils} utility methods.
- */
-public class TxUtilsTest {
- @Test
- public void testMaxVisibleTimestamp() {
- // make sure we don't overflow with MAX_VALUE write pointer
- assertEquals(Long.MAX_VALUE, TxUtils.getMaxVisibleTimestamp(Transaction.ALL_VISIBLE_LATEST));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/visibility/VisibilityFenceTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/co/cask/tephra/visibility/VisibilityFenceTest.java b/tephra-core/src/test/java/co/cask/tephra/visibility/VisibilityFenceTest.java
deleted file mode 100644
index 750fe28..0000000
--- a/tephra-core/src/test/java/co/cask/tephra/visibility/VisibilityFenceTest.java
+++ /dev/null
@@ -1,351 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.visibility;
-
-import co.cask.tephra.Transaction;
-import co.cask.tephra.TransactionAware;
-import co.cask.tephra.TransactionConflictException;
-import co.cask.tephra.TransactionContext;
-import co.cask.tephra.TransactionFailureException;
-import co.cask.tephra.TransactionManager;
-import co.cask.tephra.TransactionSystemClient;
-import co.cask.tephra.inmemory.InMemoryTxSystemClient;
-import co.cask.tephra.metrics.TxMetricsCollector;
-import co.cask.tephra.persist.InMemoryTransactionStateStorage;
-import com.google.common.base.Charsets;
-import com.google.common.base.Throwables;
-import org.apache.hadoop.conf.Configuration;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * The following are all the possible cases when using {@link VisibilityFence}.
- *
- * In the below table,
- * "Read Txn" refers to the transaction that contains the read fence
- * "Before Write", "During Write" and "After Write" refer to the write transaction time
- * "Before Write Fence", "During Write Fence", "After Write Fence" refer to the write fence transaction time
- *
- * Timeline is: Before Write < During Write < After Write < Before Write Fence < During Write Fence <
- * After Write Fence
- *
- * +------+----------------------+----------------------+--------------------+--------------------+
- * | Case | Read Txn Start | Read Txn Commit | Conflict on Commit | Conflict on Commit |
- * | | | | of Read Txn | of Write Fence |
- * +------+----------------------+----------------------+--------------------+--------------------+
- * | 1 | Before Write | Before Write | No | No |
- * | 2 | Before Write | During Write | No | No |
- * | 3 | Before Write | After Write | No | No |
- * | 4 | Before Write | Before Write Fence | No | No |
- * | 5 | Before Write | During Write Fence | No | Yes |
- * | 6 | Before Write | After Write Fence | Yes | No |
- * | | | | | |
- * | 7 | During Write | During Write | No | No |
- * | 8 | During Write | After Write | No | No |
- * | 9 | During Write | Before Write Fence | No | No |
- * | 10 | During Write | During Write Fence | No | Yes |
- * | 11 | During Write | After Write Fence | Yes | No |
- * | | | | | |
- * | 12 | After Write | After Write | No | No |
- * | 13 | After Write | Before Write Fence | No | No |
- * | 14 | After Write | During Write Fence | No | Yes # |
- * | 15 | After Write | After Write Fence | Yes # | No |
- * | | | | | |
- * | 16 | Before Write Fence | Before Write Fence | No | No |
- * | 17 | Before Write Fence | During Write Fence | No | Yes # |
- * | 18 | Before Write Fence | After Write Fence | Yes # | No |
- * | | | | | |
- * | 19 | During Write Fence | During Write Fence | No | No |
- * | 20 | During Write Fence | After Write Fence | No | No |
- * | | | | | |
- * | 21 | After Write Fence | After Write Fence | No | No |
- * +------+----------------------+----------------------+--------------------+--------------------+
- *
- * Note: Cases marked with '#' in conflict column should not conflict, however current implementation causes
- * them to conflict. The remaining conflicts are a result of the fence.
- *
- * In the current implementation of VisibilityFence, read txns that start "Before Write", "During Write",
- * and "After Write" can be represented by read txns that start "Before Write Fence".
- * Verifying cases 16, 17, 18, 20 and 21 will effectively cover all other cases.
- */
-public class VisibilityFenceTest {
- private static Configuration conf = new Configuration();
-
- private static TransactionManager txManager = null;
-
- @BeforeClass
- public static void before() {
- txManager = new TransactionManager(conf, new InMemoryTransactionStateStorage(), new TxMetricsCollector());
- txManager.startAndWait();
- }
-
- @AfterClass
- public static void after() {
- txManager.stopAndWait();
- }
-
- @Test
- public void testFence1() throws Exception {
- byte[] fenceId = "test_table".getBytes(Charsets.UTF_8);
-
- // Writer updates data here in a separate transaction (code not shown)
- // start tx
- // update
- // commit tx
-
- // Readers use fence to indicate that they are interested in changes to specific data
- TransactionAware readFenceCase16 = VisibilityFence.create(fenceId);
- TransactionContext readTxContextCase16 =
- new TransactionContext(new InMemoryTxSystemClient(txManager), readFenceCase16);
- readTxContextCase16.start();
- readTxContextCase16.finish();
-
- TransactionAware readFenceCase17 = VisibilityFence.create(fenceId);
- TransactionContext readTxContextCase17 =
- new TransactionContext(new InMemoryTxSystemClient(txManager), readFenceCase17);
- readTxContextCase17.start();
-
- TransactionAware readFenceCase18 = VisibilityFence.create(fenceId);
- TransactionContext readTxContextCase18 =
- new TransactionContext(new InMemoryTxSystemClient(txManager), readFenceCase18);
- readTxContextCase18.start();
-
- // Now writer needs to wait for in-progress readers to see the change, it uses write fence to do so
- // Start write fence txn
- TransactionAware writeFence = new WriteFence(fenceId);
- TransactionContext writeTxContext = new TransactionContext(new InMemoryTxSystemClient(txManager), writeFence);
- writeTxContext.start();
-
- TransactionAware readFenceCase20 = VisibilityFence.create(fenceId);
- TransactionContext readTxContextCase20 =
- new TransactionContext(new InMemoryTxSystemClient(txManager), readFenceCase20);
- readTxContextCase20.start();
-
- readTxContextCase17.finish();
-
- assertTxnConflict(writeTxContext);
- writeTxContext.start();
-
- // Commit write fence txn can commit without conflicts at this point
- writeTxContext.finish();
-
- TransactionAware readFenceCase21 = VisibilityFence.create(fenceId);
- TransactionContext readTxContextCase21 =
- new TransactionContext(new InMemoryTxSystemClient(txManager), readFenceCase21);
- readTxContextCase21.start();
-
- assertTxnConflict(readTxContextCase18);
- readTxContextCase20.finish();
- readTxContextCase21.finish();
- }
-
- private void assertTxnConflict(TransactionContext txContext) throws Exception {
- try {
- txContext.finish();
- Assert.fail("Expected transaction to fail");
- } catch (TransactionConflictException e) {
- // Expected
- txContext.abort();
- }
- }
-
- @Test
- public void testFence2() throws Exception {
- byte[] fenceId = "test_table".getBytes(Charsets.UTF_8);
-
- // Readers use fence to indicate that they are interested in changes to specific data
- // Reader 1
- TransactionAware readFence1 = VisibilityFence.create(fenceId);
- TransactionContext readTxContext1 = new TransactionContext(new InMemoryTxSystemClient(txManager), readFence1);
- readTxContext1.start();
-
- // Reader 2
- TransactionAware readFence2 = VisibilityFence.create(fenceId);
- TransactionContext readTxContext2 = new TransactionContext(new InMemoryTxSystemClient(txManager), readFence2);
- readTxContext2.start();
-
- // Reader 3
- TransactionAware readFence3 = VisibilityFence.create(fenceId);
- TransactionContext readTxContext3 = new TransactionContext(new InMemoryTxSystemClient(txManager), readFence3);
- readTxContext3.start();
-
- // Writer updates data here in a separate transaction (code not shown)
- // start tx
- // update
- // commit tx
-
- // Now writer needs to wait for readers 1, 2, and 3 to see the change, it uses write fence to do so
- TransactionAware writeFence = new WriteFence(fenceId);
- TransactionContext writeTxContext = new TransactionContext(new InMemoryTxSystemClient(txManager), writeFence);
- writeTxContext.start();
-
- // Reader 1 commits before writeFence is committed
- readTxContext1.finish();
-
- try {
- // writeFence will throw exception since Reader 1 committed without seeing changes
- writeTxContext.finish();
- Assert.fail("Expected transaction to fail");
- } catch (TransactionConflictException e) {
- // Expected
- writeTxContext.abort();
- }
-
- // Start over writeFence again
- writeTxContext.start();
-
- // Now, Reader 3 commits before writeFence
- // Note that Reader 3 does not conflict with Reader 1
- readTxContext3.finish();
-
- try {
- // writeFence will throw exception again since Reader 3 committed without seeing changes
- writeTxContext.finish();
- Assert.fail("Expected transaction to fail");
- } catch (TransactionConflictException e) {
- // Expected
- writeTxContext.abort();
- }
-
- // Start over writeFence again
- writeTxContext.start();
- // This time writeFence commits before the other readers
- writeTxContext.finish();
-
- // After this point all readers will see the change
-
- try {
- // Reader 2 commits after writeFence, hence this commit with throw exception
- readTxContext2.finish();
- Assert.fail("Expected transaction to fail");
- } catch (TransactionConflictException e) {
- // Expected
- readTxContext2.abort();
- }
-
- // Reader 2 has to abort and start over again. It will see the changes now.
- readTxContext2 = new TransactionContext(new InMemoryTxSystemClient(txManager), readFence2);
- readTxContext2.start();
- readTxContext2.finish();
- }
-
- @Test
- public void testFenceAwait() throws Exception {
- byte[] fenceId = "test_table".getBytes(Charsets.UTF_8);
-
- final TransactionContext fence1 = new TransactionContext(new InMemoryTxSystemClient(txManager),
- VisibilityFence.create(fenceId));
- fence1.start();
- final TransactionContext fence2 = new TransactionContext(new InMemoryTxSystemClient(txManager),
- VisibilityFence.create(fenceId));
- fence2.start();
- TransactionContext fence3 = new TransactionContext(new InMemoryTxSystemClient(txManager),
- VisibilityFence.create(fenceId));
- fence3.start();
-
- final AtomicInteger attempts = new AtomicInteger();
- TransactionSystemClient customTxClient = new InMemoryTxSystemClient(txManager) {
- @Override
- public Transaction startShort() {
- Transaction transaction = super.startShort();
- try {
- switch (attempts.getAndIncrement()) {
- case 0:
- fence1.finish();
- break;
- case 1:
- fence2.finish();
- break;
- case 2:
- break;
- default:
- throw new IllegalStateException("Unexpected state");
- }
- } catch (TransactionFailureException e) {
- Throwables.propagate(e);
- }
- return transaction;
- }
- };
-
- FenceWait fenceWait = VisibilityFence.prepareWait(fenceId, customTxClient);
- fenceWait.await(1000, TimeUnit.MILLISECONDS);
- Assert.assertEquals(3, attempts.get());
-
- try {
- fence3.finish();
- Assert.fail("Expected transaction to fail");
- } catch (TransactionConflictException e) {
- // Expected exception
- fence3.abort();
- }
-
- fence3.start();
- fence3.finish();
- }
-
- @Test
- public void testFenceTimeout() throws Exception {
- byte[] fenceId = "test_table".getBytes(Charsets.UTF_8);
-
- final TransactionContext fence1 = new TransactionContext(new InMemoryTxSystemClient(txManager),
- VisibilityFence.create(fenceId));
- fence1.start();
-
- final long timeout = 100;
- final TimeUnit timeUnit = TimeUnit.MILLISECONDS;
- final AtomicInteger attempts = new AtomicInteger();
- TransactionSystemClient customTxClient = new InMemoryTxSystemClient(txManager) {
- @Override
- public Transaction startShort() {
- Transaction transaction = super.startShort();
- try {
- switch (attempts.getAndIncrement()) {
- case 0:
- fence1.finish();
- break;
- }
- timeUnit.sleep(timeout + 1);
- } catch (InterruptedException | TransactionFailureException e) {
- Throwables.propagate(e);
- }
- return transaction;
- }
- };
-
- try {
- FenceWait fenceWait = VisibilityFence.prepareWait(fenceId, customTxClient);
- fenceWait.await(timeout, timeUnit);
- Assert.fail("Expected await to fail");
- } catch (TimeoutException e) {
- // Expected exception
- }
- Assert.assertEquals(1, attempts.get());
-
- FenceWait fenceWait = VisibilityFence.prepareWait(fenceId, customTxClient);
- fenceWait.await(timeout, timeUnit);
- Assert.assertEquals(2, attempts.get());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java b/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java
new file mode 100644
index 0000000..28000ff
--- /dev/null
+++ b/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Scopes;
+import com.google.inject.util.Modules;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tephra.distributed.TransactionService;
+import org.apache.tephra.persist.InMemoryTransactionStateStorage;
+import org.apache.tephra.persist.TransactionStateStorage;
+import org.apache.tephra.runtime.ConfigModule;
+import org.apache.tephra.runtime.DiscoveryModules;
+import org.apache.tephra.runtime.TransactionClientModule;
+import org.apache.tephra.runtime.TransactionModules;
+import org.apache.tephra.runtime.ZKModule;
+import org.apache.twill.internal.zookeeper.InMemoryZKServer;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ThriftTransactionSystemTest extends TransactionSystemTest {
+ private static final Logger LOG = LoggerFactory.getLogger(ThriftTransactionSystemTest.class);
+
+ private static InMemoryZKServer zkServer;
+ private static ZKClientService zkClientService;
+ private static TransactionService txService;
+ private static TransactionStateStorage storage;
+ private static TransactionSystemClient txClient;
+
+ @ClassRule
+ public static TemporaryFolder tmpFolder = new TemporaryFolder();
+
+ @BeforeClass
+ public static void start() throws Exception {
+ zkServer = InMemoryZKServer.builder().setDataDir(tmpFolder.newFolder()).build();
+ zkServer.startAndWait();
+
+ Configuration conf = new Configuration();
+ conf.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false);
+ conf.set(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM, zkServer.getConnectionStr());
+ conf.set(TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, "n-times");
+ conf.setInt(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, 1);
+
+ Injector injector = Guice.createInjector(
+ new ConfigModule(conf),
+ new ZKModule(),
+ new DiscoveryModules().getDistributedModules(),
+ Modules.override(new TransactionModules().getDistributedModules())
+ .with(new AbstractModule() {
+ @Override
+ protected void configure() {
+ bind(TransactionStateStorage.class).to(InMemoryTransactionStateStorage.class).in(Scopes.SINGLETON);
+ }
+ }),
+ new TransactionClientModule()
+ );
+
+ zkClientService = injector.getInstance(ZKClientService.class);
+ zkClientService.startAndWait();
+
+ // start a tx server
+ txService = injector.getInstance(TransactionService.class);
+ storage = injector.getInstance(TransactionStateStorage.class);
+ txClient = injector.getInstance(TransactionSystemClient.class);
+ try {
+ LOG.info("Starting transaction service");
+ txService.startAndWait();
+ } catch (Exception e) {
+ LOG.error("Failed to start service: ", e);
+ }
+ }
+
+ @Before
+ public void reset() throws Exception {
+ getClient().resetState();
+ }
+
+ @AfterClass
+ public static void stop() throws Exception {
+ txService.stopAndWait();
+ storage.stopAndWait();
+ zkClientService.stopAndWait();
+ zkServer.stopAndWait();
+ }
+
+ @Override
+ protected TransactionSystemClient getClient() throws Exception {
+ return txClient;
+ }
+
+ @Override
+ protected TransactionStateStorage getStateStorage() throws Exception {
+ return storage;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/TransactionAdminTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/TransactionAdminTest.java b/tephra-core/src/test/java/org/apache/tephra/TransactionAdminTest.java
new file mode 100644
index 0000000..9305229
--- /dev/null
+++ b/tephra-core/src/test/java/org/apache/tephra/TransactionAdminTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Scopes;
+import com.google.inject.util.Modules;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tephra.distributed.TransactionService;
+import org.apache.tephra.persist.InMemoryTransactionStateStorage;
+import org.apache.tephra.persist.TransactionStateStorage;
+import org.apache.tephra.runtime.ConfigModule;
+import org.apache.tephra.runtime.DiscoveryModules;
+import org.apache.tephra.runtime.TransactionClientModule;
+import org.apache.tephra.runtime.TransactionModules;
+import org.apache.tephra.runtime.ZKModule;
+import org.apache.twill.internal.zookeeper.InMemoryZKServer;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.util.concurrent.TimeUnit;
+
+public class TransactionAdminTest {
+ private static final Logger LOG = LoggerFactory.getLogger(TransactionAdminTest.class);
+
+ private static Configuration conf;
+ private static InMemoryZKServer zkServer;
+ private static ZKClientService zkClientService;
+ private static TransactionService txService;
+ private static TransactionSystemClient txClient;
+
+ @ClassRule
+ public static TemporaryFolder tmpFolder = new TemporaryFolder();
+
+ @BeforeClass
+ public static void start() throws Exception {
+ zkServer = InMemoryZKServer.builder().setDataDir(tmpFolder.newFolder()).build();
+ zkServer.startAndWait();
+
+ conf = new Configuration();
+ conf.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false);
+ conf.set(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM, zkServer.getConnectionStr());
+ conf.set(TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, "n-times");
+ conf.setInt(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, 1);
+
+ Injector injector = Guice.createInjector(
+ new ConfigModule(conf),
+ new ZKModule(),
+ new DiscoveryModules().getDistributedModules(),
+ Modules.override(new TransactionModules().getDistributedModules())
+ .with(new AbstractModule() {
+ @Override
+ protected void configure() {
+ bind(TransactionStateStorage.class).to(InMemoryTransactionStateStorage.class).in(Scopes.SINGLETON);
+ }
+ }),
+ new TransactionClientModule()
+ );
+
+ zkClientService = injector.getInstance(ZKClientService.class);
+ zkClientService.startAndWait();
+
+ // start a tx server
+ txService = injector.getInstance(TransactionService.class);
+ txClient = injector.getInstance(TransactionSystemClient.class);
+ try {
+ LOG.info("Starting transaction service");
+ txService.startAndWait();
+ } catch (Exception e) {
+ LOG.error("Failed to start service: ", e);
+ }
+ }
+
+ @Before
+ public void reset() throws Exception {
+ txClient.resetState();
+ }
+
+ @AfterClass
+ public static void stop() throws Exception {
+ txService.stopAndWait();
+ zkClientService.stopAndWait();
+ zkServer.stopAndWait();
+ }
+
+ @Test
+ public void testPrintUsage() throws Exception {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ ByteArrayOutputStream err = new ByteArrayOutputStream();
+ TransactionAdmin txAdmin = new TransactionAdmin(new PrintStream(out), new PrintStream(err));
+ int status = txAdmin.doMain(new String[0], conf);
+ Assert.assertEquals(1, status);
+ //noinspection ConstantConditions
+ Assert.assertTrue(err.toString("UTF-8").startsWith("Usage:"));
+ Assert.assertEquals(0, out.toByteArray().length);
+ }
+
+ @Test
+ public void testTruncateInvalidTx() throws Exception {
+ Transaction tx1 = txClient.startLong();
+ Transaction tx2 = txClient.startShort();
+ txClient.invalidate(tx1.getTransactionId());
+ txClient.invalidate(tx2.getTransactionId());
+ Assert.assertEquals(2, txClient.getInvalidSize());
+
+ TransactionAdmin txAdmin = new TransactionAdmin(new PrintStream(System.out), new PrintStream(System.err));
+ int status = txAdmin.doMain(new String[]{"--truncate-invalid-tx", String.valueOf(tx2.getTransactionId())}, conf);
+ Assert.assertEquals(0, status);
+ Assert.assertEquals(1, txClient.getInvalidSize());
+ }
+
+ @Test
+ public void testTruncateInvalidTxBefore() throws Exception {
+ Transaction tx1 = txClient.startLong();
+ TimeUnit.MILLISECONDS.sleep(1);
+ long beforeTx2 = System.currentTimeMillis();
+ Transaction tx2 = txClient.startLong();
+
+ // Try before invalidation
+ Assert.assertEquals(0, txClient.getInvalidSize());
+ TransactionAdmin txAdmin = new TransactionAdmin(new PrintStream(System.out), new PrintStream(System.err));
+ int status = txAdmin.doMain(new String[]{"--truncate-invalid-tx-before", String.valueOf(beforeTx2)}, conf);
+ // Assert command failed due to in-progress transactions
+ Assert.assertEquals(1, status);
+ // Assert no change to invalid size
+ Assert.assertEquals(0, txClient.getInvalidSize());
+
+ txClient.invalidate(tx1.getTransactionId());
+ txClient.invalidate(tx2.getTransactionId());
+ Assert.assertEquals(2, txClient.getInvalidSize());
+
+ status = txAdmin.doMain(new String[]{"--truncate-invalid-tx-before", String.valueOf(beforeTx2)}, conf);
+ Assert.assertEquals(0, status);
+ Assert.assertEquals(1, txClient.getInvalidSize());
+ }
+
+ @Test
+ public void testGetInvalidTxSize() throws Exception {
+ Transaction tx1 = txClient.startShort();
+ txClient.startLong();
+ txClient.invalidate(tx1.getTransactionId());
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ ByteArrayOutputStream err = new ByteArrayOutputStream();
+ TransactionAdmin txAdmin = new TransactionAdmin(new PrintStream(out), new PrintStream(err));
+ int status = txAdmin.doMain(new String[]{"--get-invalid-tx-size"}, conf);
+ Assert.assertEquals(0, status);
+ //noinspection ConstantConditions
+ Assert.assertTrue(out.toString("UTF-8").contains("Invalid list size: 1\n"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java b/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java
new file mode 100644
index 0000000..20f6944
--- /dev/null
+++ b/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java
@@ -0,0 +1,676 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import com.google.inject.Singleton;
+import com.google.inject.util.Modules;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tephra.inmemory.InMemoryTxSystemClient;
+import org.apache.tephra.runtime.ConfigModule;
+import org.apache.tephra.runtime.DiscoveryModules;
+import org.apache.tephra.runtime.TransactionModules;
+import org.apache.tephra.snapshot.SnapshotCodecV4;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Tests the transaction executor.
+ */
+public class TransactionContextTest {
+ private static DummyTxClient txClient;
+
+ @ClassRule
+ public static TemporaryFolder tmpFolder = new TemporaryFolder();
+
+ @BeforeClass
+ public static void setup() throws IOException {
+ final Configuration conf = new Configuration();
+ conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, SnapshotCodecV4.class.getName());
+ conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath());
+ Injector injector = Guice.createInjector(
+ new ConfigModule(conf),
+ new DiscoveryModules().getInMemoryModules(),
+ Modules.override(
+ new TransactionModules().getInMemoryModules()).with(new AbstractModule() {
+ @Override
+ protected void configure() {
+ TransactionManager txManager = new TransactionManager(conf);
+ txManager.startAndWait();
+ bind(TransactionManager.class).toInstance(txManager);
+ bind(TransactionSystemClient.class).to(DummyTxClient.class).in(Singleton.class);
+ }
+ }));
+
+ txClient = (DummyTxClient) injector.getInstance(TransactionSystemClient.class);
+ }
+
+ final DummyTxAware ds1 = new DummyTxAware(), ds2 = new DummyTxAware();
+
+ static final byte[] A = { 'a' };
+ static final byte[] B = { 'b' };
+
+ private static TransactionContext newTransactionContext(TransactionAware... txAwares) {
+ return new TransactionContext(txClient, txAwares);
+ }
+
+ @Before
+ public void resetTxAwares() {
+ ds1.reset();
+ ds2.reset();
+ }
+
+ @Test
+ public void testSuccessful() throws TransactionFailureException, InterruptedException {
+ TransactionContext context = newTransactionContext(ds1, ds2);
+ // start transaction
+ context.start();
+ // add a change to ds1 and ds2
+ ds1.addChange(A);
+ ds2.addChange(B);
+ // commit transaction
+ context.finish();
+ // verify both are committed and post-committed
+ Assert.assertTrue(ds1.started);
+ Assert.assertTrue(ds2.started);
+ Assert.assertTrue(ds1.checked);
+ Assert.assertTrue(ds2.checked);
+ Assert.assertTrue(ds1.committed);
+ Assert.assertTrue(ds2.committed);
+ Assert.assertTrue(ds1.postCommitted);
+ Assert.assertTrue(ds2.postCommitted);
+ Assert.assertFalse(ds1.rolledBack);
+ Assert.assertFalse(ds2.rolledBack);
+ Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Committed);
+ }
+
+ @Test
+ public void testPostCommitFailure() throws TransactionFailureException, InterruptedException {
+ ds1.failPostCommitTxOnce = InduceFailure.ThrowException;
+ TransactionContext context = newTransactionContext(ds1, ds2);
+ // start transaction
+ context.start();
+ // add a change to ds1 and ds2
+ ds1.addChange(A);
+ ds2.addChange(B);
+ // commit transaction should fail but without rollback as the failure happens post-commit
+ try {
+ context.finish();
+ Assert.fail("post commit failed - exception should be thrown");
+ } catch (TransactionFailureException e) {
+ Assert.assertEquals("post failure", e.getCause().getMessage());
+ }
+ // verify both are committed and post-committed
+ Assert.assertTrue(ds1.started);
+ Assert.assertTrue(ds2.started);
+ Assert.assertTrue(ds1.checked);
+ Assert.assertTrue(ds2.checked);
+ Assert.assertTrue(ds1.committed);
+ Assert.assertTrue(ds2.committed);
+ Assert.assertTrue(ds1.postCommitted);
+ Assert.assertTrue(ds2.postCommitted);
+ Assert.assertFalse(ds1.rolledBack);
+ Assert.assertFalse(ds2.rolledBack);
+ Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Committed);
+ }
+
+ @Test
+ public void testPersistFailure() throws TransactionFailureException, InterruptedException {
+ ds1.failCommitTxOnce = InduceFailure.ThrowException;
+ TransactionContext context = newTransactionContext(ds1, ds2);
+ // start transaction
+ context.start();
+ // add a change to ds1 and ds2
+ ds1.addChange(A);
+ ds2.addChange(B);
+ // commit transaction should fail and cause rollback
+ try {
+ context.finish();
+ Assert.fail("Persist should have failed - exception should be thrown");
+ } catch (TransactionFailureException e) {
+ Assert.assertEquals("persist failure", e.getCause().getMessage());
+ }
+ // verify both are rolled back and tx is aborted
+ Assert.assertTrue(ds1.started);
+ Assert.assertTrue(ds2.started);
+ Assert.assertTrue(ds1.checked);
+ Assert.assertTrue(ds2.checked);
+ Assert.assertTrue(ds1.committed);
+ Assert.assertFalse(ds2.committed);
+ Assert.assertFalse(ds1.postCommitted);
+ Assert.assertFalse(ds2.postCommitted);
+ Assert.assertTrue(ds1.rolledBack);
+ Assert.assertTrue(ds2.rolledBack);
+ Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted);
+ }
+
+ @Test
+ public void testPersistFalse() throws TransactionFailureException, InterruptedException {
+ ds1.failCommitTxOnce = InduceFailure.ReturnFalse;
+ TransactionContext context = newTransactionContext(ds1, ds2);
+ // start transaction
+ context.start();
+ // add a change to ds1 and ds2
+ ds1.addChange(A);
+ ds2.addChange(B);
+ // commit transaction should fail and cause rollback
+ try {
+ context.finish();
+ Assert.fail("Persist should have failed - exception should be thrown");
+ } catch (TransactionFailureException e) {
+ Assert.assertNull(e.getCause()); // in this case, the ds simply returned false
+ }
+ // verify both are rolled back and tx is aborted
+ Assert.assertTrue(ds1.started);
+ Assert.assertTrue(ds2.started);
+ Assert.assertTrue(ds1.checked);
+ Assert.assertTrue(ds2.checked);
+ Assert.assertTrue(ds1.committed);
+ Assert.assertFalse(ds2.committed);
+ Assert.assertFalse(ds1.postCommitted);
+ Assert.assertFalse(ds2.postCommitted);
+ Assert.assertTrue(ds1.rolledBack);
+ Assert.assertTrue(ds2.rolledBack);
+ Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted);
+ }
+
+ @Test
+ public void testPersistAndRollbackFailure() throws TransactionFailureException, InterruptedException {
+ ds1.failCommitTxOnce = InduceFailure.ThrowException;
+ ds1.failRollbackTxOnce = InduceFailure.ThrowException;
+ TransactionContext context = newTransactionContext(ds1, ds2);
+ // start transaction
+ context.start();
+ // add a change to ds1 and ds2
+ ds1.addChange(A);
+ ds2.addChange(B);
+ // commit transaction should fail and cause rollback
+ try {
+ context.finish();
+ Assert.fail("Persist should have failed - exception should be thrown");
+ } catch (TransactionFailureException e) {
+ Assert.assertEquals("persist failure", e.getCause().getMessage());
+ }
+ // verify both are rolled back and tx is invalidated
+ Assert.assertTrue(ds1.started);
+ Assert.assertTrue(ds2.started);
+ Assert.assertTrue(ds1.checked);
+ Assert.assertTrue(ds2.checked);
+ Assert.assertTrue(ds1.committed);
+ Assert.assertFalse(ds2.committed);
+ Assert.assertFalse(ds1.postCommitted);
+ Assert.assertFalse(ds2.postCommitted);
+ Assert.assertTrue(ds1.rolledBack);
+ Assert.assertTrue(ds2.rolledBack);
+ Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Invalidated);
+ }
+
+ @Test
+ public void testPersistAndRollbackFalse() throws TransactionFailureException, InterruptedException {
+ ds1.failCommitTxOnce = InduceFailure.ReturnFalse;
+ ds1.failRollbackTxOnce = InduceFailure.ReturnFalse;
+ TransactionContext context = newTransactionContext(ds1, ds2);
+ // start transaction
+ context.start();
+ // add a change to ds1 and ds2
+ ds1.addChange(A);
+ ds2.addChange(B);
+ // commit transaction should fail and cause rollback
+ try {
+ context.finish();
+ Assert.fail("Persist should have failed - exception should be thrown");
+ } catch (TransactionFailureException e) {
+ Assert.assertNull(e.getCause()); // in this case, the ds simply returned false
+ }
+ // verify both are rolled back and tx is invalidated
+ Assert.assertTrue(ds1.started);
+ Assert.assertTrue(ds2.started);
+ Assert.assertTrue(ds1.checked);
+ Assert.assertTrue(ds2.checked);
+ Assert.assertTrue(ds1.committed);
+ Assert.assertFalse(ds2.committed);
+ Assert.assertFalse(ds1.postCommitted);
+ Assert.assertFalse(ds2.postCommitted);
+ Assert.assertTrue(ds1.rolledBack);
+ Assert.assertTrue(ds2.rolledBack);
+ Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Invalidated);
+ }
+
+ @Test
+ public void testCommitFalse() throws TransactionFailureException, InterruptedException {
+ txClient.failCommits = 1;
+ TransactionContext context = newTransactionContext(ds1, ds2);
+ // start transaction
+ context.start();
+ // add a change to ds1 and ds2
+ ds1.addChange(A);
+ ds2.addChange(B);
+ // commit transaction should fail and cause rollback
+ try {
+ context.finish();
+ Assert.fail("commit failed - exception should be thrown");
+ } catch (TransactionConflictException e) {
+ Assert.assertNull(e.getCause());
+ }
+ // verify both are rolled back and tx is aborted
+ Assert.assertTrue(ds1.started);
+ Assert.assertTrue(ds2.started);
+ Assert.assertTrue(ds1.checked);
+ Assert.assertTrue(ds2.checked);
+ Assert.assertTrue(ds1.committed);
+ Assert.assertTrue(ds2.committed);
+ Assert.assertFalse(ds1.postCommitted);
+ Assert.assertFalse(ds2.postCommitted);
+ Assert.assertTrue(ds1.rolledBack);
+ Assert.assertTrue(ds2.rolledBack);
+ Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted);
+ }
+
+ @Test
+ public void testCanCommitFalse() throws TransactionFailureException, InterruptedException {
+ txClient.failCanCommitOnce = true;
+ TransactionContext context = newTransactionContext(ds1, ds2);
+ // start transaction
+ context.start();
+ // add a change to ds1 and ds2
+ ds1.addChange(A);
+ ds2.addChange(B);
+ // commit transaction should fail and cause rollback
+ try {
+ context.finish();
+ Assert.fail("commit failed - exception should be thrown");
+ } catch (TransactionConflictException e) {
+ Assert.assertNull(e.getCause());
+ }
+ // verify both are rolled back and tx is aborted
+ Assert.assertTrue(ds1.started);
+ Assert.assertTrue(ds2.started);
+ Assert.assertTrue(ds1.checked);
+ Assert.assertTrue(ds2.checked);
+ Assert.assertFalse(ds1.committed);
+ Assert.assertFalse(ds2.committed);
+ Assert.assertFalse(ds1.postCommitted);
+ Assert.assertFalse(ds2.postCommitted);
+ Assert.assertTrue(ds1.rolledBack);
+ Assert.assertTrue(ds2.rolledBack);
+ Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted);
+ }
+
+ @Test
+ public void testChangesAndRollbackFailure() throws TransactionFailureException, InterruptedException {
+ ds1.failChangesTxOnce = InduceFailure.ThrowException;
+ ds1.failRollbackTxOnce = InduceFailure.ThrowException;
+ TransactionContext context = newTransactionContext(ds1, ds2);
+ // start transaction
+ context.start();
+ // add a change to ds1 and ds2
+ ds1.addChange(A);
+ ds2.addChange(B);
+ // commit transaction should fail and cause rollback
+ try {
+ context.finish();
+ Assert.fail("get changes failed - exception should be thrown");
+ } catch (TransactionFailureException e) {
+ Assert.assertEquals("changes failure", e.getCause().getMessage());
+ }
+ // verify both are rolled back and tx is invalidated
+ Assert.assertTrue(ds1.started);
+ Assert.assertTrue(ds2.started);
+ Assert.assertTrue(ds1.checked);
+ Assert.assertFalse(ds2.checked);
+ Assert.assertFalse(ds1.committed);
+ Assert.assertFalse(ds2.committed);
+ Assert.assertFalse(ds1.postCommitted);
+ Assert.assertFalse(ds2.postCommitted);
+ Assert.assertTrue(ds1.rolledBack);
+ Assert.assertTrue(ds2.rolledBack);
+ Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Invalidated);
+ }
+
+ @Test
+ public void testStartAndRollbackFailure() throws TransactionFailureException, InterruptedException {
+ ds1.failStartTxOnce = InduceFailure.ThrowException;
+ TransactionContext context = newTransactionContext(ds1, ds2);
+ // start transaction
+ try {
+ context.start();
+ Assert.fail("start failed - exception should be thrown");
+ } catch (TransactionFailureException e) {
+ Assert.assertEquals("start failure", e.getCause().getMessage());
+ }
+ // verify both are not rolled back and tx is aborted
+ Assert.assertTrue(ds1.started);
+ Assert.assertFalse(ds2.started);
+ Assert.assertFalse(ds1.checked);
+ Assert.assertFalse(ds2.checked);
+ Assert.assertFalse(ds1.committed);
+ Assert.assertFalse(ds2.committed);
+ Assert.assertFalse(ds1.postCommitted);
+ Assert.assertFalse(ds2.postCommitted);
+ Assert.assertFalse(ds1.rolledBack);
+ Assert.assertFalse(ds2.rolledBack);
+ Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted);
+ }
+
+ @Test
+ public void testAddThenSuccess() throws TransactionFailureException, InterruptedException {
+ TransactionContext context = newTransactionContext(ds1);
+ // start transaction
+ context.start();
+ // add a change to ds1
+ ds1.addChange(A);
+ // add ds2 to the tx
+ context.addTransactionAware(ds2);
+ // add a change to ds2
+ ds2.addChange(B);
+ // commit transaction
+ context.finish();
+ // verify both are committed and post-committed
+ Assert.assertTrue(ds1.started);
+ Assert.assertTrue(ds2.started);
+ Assert.assertTrue(ds1.checked);
+ Assert.assertTrue(ds2.checked);
+ Assert.assertTrue(ds1.committed);
+ Assert.assertTrue(ds2.committed);
+ Assert.assertTrue(ds1.postCommitted);
+ Assert.assertTrue(ds2.postCommitted);
+ Assert.assertFalse(ds1.rolledBack);
+ Assert.assertFalse(ds2.rolledBack);
+ Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Committed);
+ }
+
+ @Test
+ public void testAddThenFailure() throws TransactionFailureException, InterruptedException {
+ ds2.failCommitTxOnce = InduceFailure.ThrowException;
+
+ TransactionContext context = newTransactionContext(ds1);
+ // start transaction
+ context.start();
+ // add a change to ds1
+ ds1.addChange(A);
+ // add ds2 to the tx
+ context.addTransactionAware(ds2);
+ // add a change to ds2
+ ds2.addChange(B);
+ // commit transaction should fail and cause rollback
+ try {
+ context.finish();
+ Assert.fail("Persist should have failed - exception should be thrown");
+ } catch (TransactionFailureException e) {
+ Assert.assertEquals("persist failure", e.getCause().getMessage());
+ }
+ // verify both are rolled back and tx is aborted
+ Assert.assertTrue(ds1.started);
+ Assert.assertTrue(ds2.started);
+ Assert.assertTrue(ds1.checked);
+ Assert.assertTrue(ds2.checked);
+ Assert.assertTrue(ds1.committed);
+ Assert.assertTrue(ds2.committed);
+ Assert.assertFalse(ds1.postCommitted);
+ Assert.assertFalse(ds2.postCommitted);
+ Assert.assertTrue(ds1.rolledBack);
+ Assert.assertTrue(ds2.rolledBack);
+ Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted);
+ }
+
+ @Test
+ public void testAddThenRemoveSuccess() throws TransactionFailureException {
+ TransactionContext context = newTransactionContext();
+
+ context.start();
+ Assert.assertTrue(context.addTransactionAware(ds1));
+ ds1.addChange(A);
+
+ try {
+ context.removeTransactionAware(ds1);
+ Assert.fail("Removal of TransactionAware should fails when there is active transaction.");
+ } catch (IllegalStateException e) {
+ // Expected
+ }
+
+ context.finish();
+
+ Assert.assertTrue(context.removeTransactionAware(ds1));
+ // Removing a TransactionAware not added before should returns false
+ Assert.assertFalse(context.removeTransactionAware(ds2));
+
+ // Verify ds1 is committed and post-committed
+ Assert.assertTrue(ds1.started);
+ Assert.assertTrue(ds1.checked);
+ Assert.assertTrue(ds1.committed);
+ Assert.assertTrue(ds1.postCommitted);
+ Assert.assertFalse(ds1.rolledBack);
+
+ // Verify nothing happen to ds2
+ Assert.assertFalse(ds2.started);
+ Assert.assertFalse(ds2.checked);
+ Assert.assertFalse(ds2.committed);
+ Assert.assertFalse(ds2.postCommitted);
+ Assert.assertFalse(ds2.rolledBack);
+
+ Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Committed);
+ }
+
+ @Test
+ public void testAndThenRemoveOnFailure() throws TransactionFailureException {
+ ds1.failCommitTxOnce = InduceFailure.ThrowException;
+ TransactionContext context = newTransactionContext();
+
+ context.start();
+ Assert.assertTrue(context.addTransactionAware(ds1));
+ ds1.addChange(A);
+
+ try {
+ context.finish();
+ Assert.fail("Persist should have failed - exception should be thrown");
+ } catch (TransactionFailureException e) {
+ Assert.assertEquals("persist failure", e.getCause().getMessage());
+ }
+
+ Assert.assertTrue(context.removeTransactionAware(ds1));
+
+ // Verify ds1 is rolled back
+ Assert.assertTrue(ds1.started);
+ Assert.assertTrue(ds1.checked);
+ Assert.assertTrue(ds1.committed);
+ Assert.assertFalse(ds1.postCommitted);
+ Assert.assertTrue(ds1.rolledBack);
+
+ Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted);
+ }
+
+ enum InduceFailure { NoFailure, ReturnFalse, ThrowException }
+
+ static class DummyTxAware implements TransactionAware {
+
+ Transaction tx;
+ boolean started = false;
+ boolean committed = false;
+ boolean checked = false;
+ boolean rolledBack = false;
+ boolean postCommitted = false;
+ List<byte[]> changes = Lists.newArrayList();
+
+ InduceFailure failStartTxOnce = InduceFailure.NoFailure;
+ InduceFailure failChangesTxOnce = InduceFailure.NoFailure;
+ InduceFailure failCommitTxOnce = InduceFailure.NoFailure;
+ InduceFailure failPostCommitTxOnce = InduceFailure.NoFailure;
+ InduceFailure failRollbackTxOnce = InduceFailure.NoFailure;
+
+ void addChange(byte[] key) {
+ changes.add(key);
+ }
+
+ void reset() {
+ tx = null;
+ started = false;
+ checked = false;
+ committed = false;
+ rolledBack = false;
+ postCommitted = false;
+ changes.clear();
+ }
+
+ @Override
+ public void startTx(Transaction tx) {
+ reset();
+ started = true;
+ this.tx = tx;
+ if (failStartTxOnce == InduceFailure.ThrowException) {
+ failStartTxOnce = InduceFailure.NoFailure;
+ throw new RuntimeException("start failure");
+ }
+ }
+
+ @Override
+ public void updateTx(Transaction tx) {
+ this.tx = tx;
+ }
+
+ @Override
+ public Collection<byte[]> getTxChanges() {
+ checked = true;
+ if (failChangesTxOnce == InduceFailure.ThrowException) {
+ failChangesTxOnce = InduceFailure.NoFailure;
+ throw new RuntimeException("changes failure");
+ }
+ return ImmutableList.copyOf(changes);
+ }
+
+ @Override
+ public boolean commitTx() throws Exception {
+ committed = true;
+ if (failCommitTxOnce == InduceFailure.ThrowException) {
+ failCommitTxOnce = InduceFailure.NoFailure;
+ throw new RuntimeException("persist failure");
+ }
+ if (failCommitTxOnce == InduceFailure.ReturnFalse) {
+ failCommitTxOnce = InduceFailure.NoFailure;
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public void postTxCommit() {
+ postCommitted = true;
+ if (failPostCommitTxOnce == InduceFailure.ThrowException) {
+ failPostCommitTxOnce = InduceFailure.NoFailure;
+ throw new RuntimeException("post failure");
+ }
+ }
+
+ @Override
+ public boolean rollbackTx() throws Exception {
+ rolledBack = true;
+ if (failRollbackTxOnce == InduceFailure.ThrowException) {
+ failRollbackTxOnce = InduceFailure.NoFailure;
+ throw new RuntimeException("rollback failure");
+ }
+ if (failRollbackTxOnce == InduceFailure.ReturnFalse) {
+ failRollbackTxOnce = InduceFailure.NoFailure;
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public String getTransactionAwareName() {
+ return "dummy";
+ }
+ }
+
+ static class DummyTxClient extends InMemoryTxSystemClient {
+
+ boolean failCanCommitOnce = false;
+ int failCommits = 0;
+ enum CommitState {
+ Started, Committed, Aborted, Invalidated
+ }
+ CommitState state = CommitState.Started;
+
+ @Inject
+ DummyTxClient(TransactionManager txmgr) {
+ super(txmgr);
+ }
+
+ @Override
+ public boolean canCommit(Transaction tx, Collection<byte[]> changeIds) throws TransactionNotInProgressException {
+ if (failCanCommitOnce) {
+ failCanCommitOnce = false;
+ return false;
+ } else {
+ return super.canCommit(tx, changeIds);
+ }
+ }
+
+ @Override
+ public boolean commit(Transaction tx) throws TransactionNotInProgressException {
+ if (failCommits-- > 0) {
+ return false;
+ } else {
+ state = CommitState.Committed;
+ return super.commit(tx);
+ }
+ }
+
+ @Override
+ public Transaction startLong() {
+ state = CommitState.Started;
+ return super.startLong();
+ }
+
+ @Override
+ public Transaction startShort() {
+ state = CommitState.Started;
+ return super.startShort();
+ }
+
+ @Override
+ public Transaction startShort(int timeout) {
+ state = CommitState.Started;
+ return super.startShort(timeout);
+ }
+
+ @Override
+ public void abort(Transaction tx) {
+ state = CommitState.Aborted;
+ super.abort(tx);
+ }
+
+ @Override
+ public boolean invalidate(long tx) {
+ state = CommitState.Invalidated;
+ return super.invalidate(tx);
+ }
+ }
+}