You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by po...@apache.org on 2016/05/11 20:15:26 UTC
[26/56] [abbrv] [partial] incubator-tephra git commit: Rename package
to org.apache.tephra
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/persist/AbstractTransactionStateStorageTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/persist/AbstractTransactionStateStorageTest.java b/tephra-core/src/test/java/org/apache/tephra/persist/AbstractTransactionStateStorageTest.java
new file mode 100644
index 0000000..628bced
--- /dev/null
+++ b/tephra-core/src/test/java/org/apache/tephra/persist/AbstractTransactionStateStorageTest.java
@@ -0,0 +1,555 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.persist;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tephra.ChangeId;
+import org.apache.tephra.Transaction;
+import org.apache.tephra.TransactionManager;
+import org.apache.tephra.TransactionType;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.metrics.TxMetricsCollector;
+import org.apache.tephra.util.TransactionEditUtil;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Commons tests to run against the {@link TransactionStateStorage} implementations.
+ */
+public abstract class AbstractTransactionStateStorageTest {
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionStateStorageTest.class);
+ private static Random random = new Random();
+
+ protected abstract Configuration getConfiguration(String testName) throws IOException;
+
+ protected abstract AbstractTransactionStateStorage getStorage(Configuration conf);
+
+ @Test
+ public void testSnapshotPersistence() throws Exception {
+ Configuration conf = getConfiguration("testSnapshotPersistence");
+
+ TransactionSnapshot snapshot = createRandomSnapshot();
+ TransactionStateStorage storage = getStorage(conf);
+ try {
+ storage.startAndWait();
+ storage.writeSnapshot(snapshot);
+
+ TransactionSnapshot readSnapshot = storage.getLatestSnapshot();
+ assertNotNull(readSnapshot);
+ assertEquals(snapshot, readSnapshot);
+ } finally {
+ storage.stopAndWait();
+ }
+ }
+
+ @Test
+ public void testLogWriteAndRead() throws Exception {
+ Configuration conf = getConfiguration("testLogWriteAndRead");
+
+ // create some random entries
+ List<TransactionEdit> edits = TransactionEditUtil.createRandomEdits(100);
+ TransactionStateStorage storage = getStorage(conf);
+ try {
+ long now = System.currentTimeMillis();
+ storage.startAndWait();
+ TransactionLog log = storage.createLog(now);
+ for (TransactionEdit edit : edits) {
+ log.append(edit);
+ }
+ log.close();
+
+ Collection<TransactionLog> logsToRead = storage.getLogsSince(now);
+ // should only be our one log
+ assertNotNull(logsToRead);
+ assertEquals(1, logsToRead.size());
+ TransactionLogReader logReader = logsToRead.iterator().next().getReader();
+ assertNotNull(logReader);
+
+ List<TransactionEdit> readEdits = Lists.newArrayListWithExpectedSize(edits.size());
+ TransactionEdit nextEdit;
+ while ((nextEdit = logReader.next()) != null) {
+ readEdits.add(nextEdit);
+ }
+ logReader.close();
+ assertEquals(edits.size(), readEdits.size());
+ for (int i = 0; i < edits.size(); i++) {
+ LOG.info("Checking edit " + i);
+ assertEquals(edits.get(i), readEdits.get(i));
+ }
+ } finally {
+ storage.stopAndWait();
+ }
+ }
+
+ @Test
+ public void testTransactionManagerPersistence() throws Exception {
+ Configuration conf = getConfiguration("testTransactionManagerPersistence");
+ conf.setInt(TxConstants.Manager.CFG_TX_CLEANUP_INTERVAL, 0); // no cleanup thread
+ // start snapshot thread, but with long enough interval so we only get snapshots on shutdown
+ conf.setInt(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, 600);
+
+ TransactionStateStorage storage = null;
+ TransactionStateStorage storage2 = null;
+ TransactionStateStorage storage3 = null;
+ try {
+ storage = getStorage(conf);
+ TransactionManager txManager = new TransactionManager
+ (conf, storage, new TxMetricsCollector());
+ txManager.startAndWait();
+
+ // TODO: replace with new persistence tests
+ final byte[] a = { 'a' };
+ final byte[] b = { 'b' };
+ // start a tx1, add a change A and commit
+ Transaction tx1 = txManager.startShort();
+ Assert.assertTrue(txManager.canCommit(tx1, Collections.singleton(a)));
+ Assert.assertTrue(txManager.commit(tx1));
+ // start a tx2 and add a change B
+ Transaction tx2 = txManager.startShort();
+ Assert.assertTrue(txManager.canCommit(tx2, Collections.singleton(b)));
+ // start a tx3
+ Transaction tx3 = txManager.startShort();
+ // restart
+ txManager.stopAndWait();
+ TransactionSnapshot origState = txManager.getCurrentState();
+ LOG.info("Orig state: " + origState);
+
+ Thread.sleep(100);
+ // starts a new tx manager
+ storage2 = getStorage(conf);
+ txManager = new TransactionManager(conf, storage2, new TxMetricsCollector());
+ txManager.startAndWait();
+
+ // check that the reloaded state matches the old
+ TransactionSnapshot newState = txManager.getCurrentState();
+ LOG.info("New state: " + newState);
+ assertEquals(origState, newState);
+
+ // commit tx2
+ Assert.assertTrue(txManager.commit(tx2));
+ // start another transaction, must be greater than tx3
+ Transaction tx4 = txManager.startShort();
+ Assert.assertTrue(tx4.getTransactionId() > tx3.getTransactionId());
+ // tx1 must be visble from tx2, but tx3 and tx4 must not
+ Assert.assertTrue(tx2.isVisible(tx1.getTransactionId()));
+ Assert.assertFalse(tx2.isVisible(tx3.getTransactionId()));
+ Assert.assertFalse(tx2.isVisible(tx4.getTransactionId()));
+ // add same change for tx3
+ Assert.assertFalse(txManager.canCommit(tx3, Collections.singleton(b)));
+ // check visibility with new xaction
+ Transaction tx5 = txManager.startShort();
+ Assert.assertTrue(tx5.isVisible(tx1.getTransactionId()));
+ Assert.assertTrue(tx5.isVisible(tx2.getTransactionId()));
+ Assert.assertFalse(tx5.isVisible(tx3.getTransactionId()));
+ Assert.assertFalse(tx5.isVisible(tx4.getTransactionId()));
+ // can commit tx3?
+ txManager.abort(tx3);
+ txManager.abort(tx4);
+ txManager.abort(tx5);
+ // start new tx and verify its exclude list is empty
+ Transaction tx6 = txManager.startShort();
+ Assert.assertFalse(tx6.hasExcludes());
+ txManager.abort(tx6);
+
+ // now start 5 x claim size transactions
+ Transaction tx = txManager.startShort();
+ for (int i = 1; i < 50; i++) {
+ tx = txManager.startShort();
+ }
+ origState = txManager.getCurrentState();
+
+ Thread.sleep(100);
+ // simulate crash by starting a new tx manager without a stopAndWait
+ storage3 = getStorage(conf);
+ txManager = new TransactionManager(conf, storage3, new TxMetricsCollector());
+ txManager.startAndWait();
+
+ // verify state again matches (this time should include WAL replay)
+ newState = txManager.getCurrentState();
+ assertEquals(origState, newState);
+
+ // get a new transaction and verify it is greater
+ Transaction txAfter = txManager.startShort();
+ Assert.assertTrue(txAfter.getTransactionId() > tx.getTransactionId());
+ } finally {
+ if (storage != null) {
+ storage.stopAndWait();
+ }
+ if (storage2 != null) {
+ storage2.stopAndWait();
+ }
+ if (storage3 != null) {
+ storage3.stopAndWait();
+ }
+ }
+ }
+
+ /**
+ * Tests whether the committed set is advanced properly on WAL replay.
+ */
+ @Test
+ public void testCommittedSetClearing() throws Exception {
+ Configuration conf = getConfiguration("testCommittedSetClearing");
+ conf.setInt(TxConstants.Manager.CFG_TX_CLEANUP_INTERVAL, 0); // no cleanup thread
+ conf.setInt(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, 0); // no periodic snapshots
+
+ TransactionStateStorage storage1 = null;
+ TransactionStateStorage storage2 = null;
+ try {
+ storage1 = getStorage(conf);
+ TransactionManager txManager = new TransactionManager
+ (conf, storage1, new TxMetricsCollector());
+ txManager.startAndWait();
+
+ // TODO: replace with new persistence tests
+ final byte[] a = { 'a' };
+ final byte[] b = { 'b' };
+ // start a tx1, add a change A and commit
+ Transaction tx1 = txManager.startShort();
+ Assert.assertTrue(txManager.canCommit(tx1, Collections.singleton(a)));
+ Assert.assertTrue(txManager.commit(tx1));
+ // start a tx2 and add a change B
+ Transaction tx2 = txManager.startShort();
+ Assert.assertTrue(txManager.canCommit(tx2, Collections.singleton(b)));
+ // start a tx3
+ Transaction tx3 = txManager.startShort();
+ TransactionSnapshot origState = txManager.getCurrentState();
+ LOG.info("Orig state: " + origState);
+
+ // simulate a failure by starting a new tx manager without stopping first
+ storage2 = getStorage(conf);
+ txManager = new TransactionManager(conf, storage2, new TxMetricsCollector());
+ txManager.startAndWait();
+
+ // check that the reloaded state matches the old
+ TransactionSnapshot newState = txManager.getCurrentState();
+ LOG.info("New state: " + newState);
+ assertEquals(origState, newState);
+
+ } finally {
+ if (storage1 != null) {
+ storage1.stopAndWait();
+ }
+ if (storage2 != null) {
+ storage2.stopAndWait();
+ }
+ }
+ }
+
+ /**
+ * Tests removal of old snapshots and old transaction logs.
+ */
+ @Test
+ public void testOldFileRemoval() throws Exception {
+ Configuration conf = getConfiguration("testOldFileRemoval");
+ TransactionStateStorage storage = null;
+ try {
+ storage = getStorage(conf);
+ storage.startAndWait();
+ long now = System.currentTimeMillis();
+ long writePointer = 1;
+ Collection<Long> invalid = Lists.newArrayList();
+ NavigableMap<Long, TransactionManager.InProgressTx> inprogress = Maps.newTreeMap();
+ Map<Long, Set<ChangeId>> committing = Maps.newHashMap();
+ Map<Long, Set<ChangeId>> committed = Maps.newHashMap();
+ TransactionSnapshot snapshot = new TransactionSnapshot(now, 0, writePointer++, invalid,
+ inprogress, committing, committed);
+ TransactionEdit dummyEdit = TransactionEdit.createStarted(1, 0, Long.MAX_VALUE, TransactionType.SHORT);
+
+ // write snapshot 1
+ storage.writeSnapshot(snapshot);
+ TransactionLog log = storage.createLog(now);
+ log.append(dummyEdit);
+ log.close();
+
+ snapshot = new TransactionSnapshot(now + 1, 0, writePointer++, invalid, inprogress, committing, committed);
+ // write snapshot 2
+ storage.writeSnapshot(snapshot);
+ log = storage.createLog(now + 1);
+ log.append(dummyEdit);
+ log.close();
+
+ snapshot = new TransactionSnapshot(now + 2, 0, writePointer++, invalid, inprogress, committing, committed);
+ // write snapshot 3
+ storage.writeSnapshot(snapshot);
+ log = storage.createLog(now + 2);
+ log.append(dummyEdit);
+ log.close();
+
+ snapshot = new TransactionSnapshot(now + 3, 0, writePointer++, invalid, inprogress, committing, committed);
+ // write snapshot 4
+ storage.writeSnapshot(snapshot);
+ log = storage.createLog(now + 3);
+ log.append(dummyEdit);
+ log.close();
+
+ snapshot = new TransactionSnapshot(now + 4, 0, writePointer++, invalid, inprogress, committing, committed);
+ // write snapshot 5
+ storage.writeSnapshot(snapshot);
+ log = storage.createLog(now + 4);
+ log.append(dummyEdit);
+ log.close();
+
+ snapshot = new TransactionSnapshot(now + 5, 0, writePointer++, invalid, inprogress, committing, committed);
+ // write snapshot 6
+ storage.writeSnapshot(snapshot);
+ log = storage.createLog(now + 5);
+ log.append(dummyEdit);
+ log.close();
+
+ List<String> allSnapshots = storage.listSnapshots();
+ LOG.info("All snapshots: " + allSnapshots);
+ assertEquals(6, allSnapshots.size());
+ List<String> allLogs = storage.listLogs();
+ LOG.info("All logs: " + allLogs);
+ assertEquals(6, allLogs.size());
+
+ long oldestKept = storage.deleteOldSnapshots(3);
+ assertEquals(now + 3, oldestKept);
+ allSnapshots = storage.listSnapshots();
+ LOG.info("All snapshots: " + allSnapshots);
+ assertEquals(3, allSnapshots.size());
+
+ storage.deleteLogsOlderThan(oldestKept);
+ allLogs = storage.listLogs();
+ LOG.info("All logs: " + allLogs);
+ assertEquals(3, allLogs.size());
+ } finally {
+ if (storage != null) {
+ storage.stopAndWait();
+ }
+ }
+ }
+
+ @Test
+ public void testLongTxnEditReplay() throws Exception {
+ Configuration conf = getConfiguration("testLongTxnEditReplay");
+ TransactionStateStorage storage = null;
+ try {
+ storage = getStorage(conf);
+ storage.startAndWait();
+
+ // Create long running txns. Abort one of them, invalidate another, invalidate and abort the last.
+ long time1 = System.currentTimeMillis();
+ long wp1 = time1 * TxConstants.MAX_TX_PER_MS;
+ TransactionEdit edit1 = TransactionEdit.createStarted(wp1, wp1 - 10, time1 + 100000, TransactionType.LONG);
+ TransactionEdit edit2 = TransactionEdit.createAborted(wp1, TransactionType.LONG, null);
+
+ long time2 = time1 + 100;
+ long wp2 = time2 * TxConstants.MAX_TX_PER_MS;
+ TransactionEdit edit3 = TransactionEdit.createStarted(wp2, wp2 - 10, time2 + 100000, TransactionType.LONG);
+ TransactionEdit edit4 = TransactionEdit.createInvalid(wp2);
+
+ long time3 = time1 + 200;
+ long wp3 = time3 * TxConstants.MAX_TX_PER_MS;
+ TransactionEdit edit5 = TransactionEdit.createStarted(wp3, wp3 - 10, time3 + 100000, TransactionType.LONG);
+ TransactionEdit edit6 = TransactionEdit.createInvalid(wp3);
+ TransactionEdit edit7 = TransactionEdit.createAborted(wp3, TransactionType.LONG, null);
+
+ // write transaction edits
+ TransactionLog log = storage.createLog(time1);
+ log.append(edit1);
+ log.append(edit2);
+ log.append(edit3);
+ log.append(edit4);
+ log.append(edit5);
+ log.append(edit6);
+ log.append(edit7);
+ log.close();
+
+ // Start transaction manager
+ TransactionManager txm = new TransactionManager(conf, storage, new TxMetricsCollector());
+ txm.startAndWait();
+ try {
+ // Verify that all txns are in invalid list.
+ TransactionSnapshot snapshot1 = txm.getCurrentState();
+ assertEquals(ImmutableList.of(wp1, wp2, wp3), snapshot1.getInvalid());
+ assertEquals(0, snapshot1.getInProgress().size());
+ assertEquals(0, snapshot1.getCommittedChangeSets().size());
+ assertEquals(0, snapshot1.getCommittedChangeSets().size());
+ } finally {
+ txm.stopAndWait();
+ }
+ } finally {
+ if (storage != null) {
+ storage.stopAndWait();
+ }
+ }
+ }
+
+ @Test
+ public void testTruncateInvalidTxEditReplay() throws Exception {
+ Configuration conf = getConfiguration("testTruncateInvalidTxEditReplay");
+ TransactionStateStorage storage = null;
+ try {
+ storage = getStorage(conf);
+ storage.startAndWait();
+
+ // Create some txns, and invalidate all of them.
+ long time1 = System.currentTimeMillis();
+ long wp1 = time1 * TxConstants.MAX_TX_PER_MS;
+ TransactionEdit edit1 = TransactionEdit.createStarted(wp1, wp1 - 10, time1 + 100000, TransactionType.LONG);
+ TransactionEdit edit2 = TransactionEdit.createInvalid(wp1);
+
+ long time2 = time1 + 100;
+ long wp2 = time2 * TxConstants.MAX_TX_PER_MS;
+ TransactionEdit edit3 = TransactionEdit.createStarted(wp2, wp2 - 10, time2 + 10000, TransactionType.SHORT);
+ TransactionEdit edit4 = TransactionEdit.createInvalid(wp2);
+
+ long time3 = time1 + 2000;
+ long wp3 = time3 * TxConstants.MAX_TX_PER_MS;
+ TransactionEdit edit5 = TransactionEdit.createStarted(wp3, wp3 - 10, time3 + 100000, TransactionType.LONG);
+ TransactionEdit edit6 = TransactionEdit.createInvalid(wp3);
+
+ long time4 = time1 + 2100;
+ long wp4 = time4 * TxConstants.MAX_TX_PER_MS;
+ TransactionEdit edit7 = TransactionEdit.createStarted(wp4, wp4 - 10, time4 + 10000, TransactionType.SHORT);
+ TransactionEdit edit8 = TransactionEdit.createInvalid(wp4);
+
+ // remove wp1 and wp3 from invalid list
+ TransactionEdit edit9 = TransactionEdit.createTruncateInvalidTx(ImmutableSet.of(wp1, wp3));
+ // truncate invalid transactions before time3
+ TransactionEdit edit10 = TransactionEdit.createTruncateInvalidTxBefore(time3);
+
+ // write transaction edits
+ TransactionLog log = storage.createLog(time1);
+ log.append(edit1);
+ log.append(edit2);
+ log.append(edit3);
+ log.append(edit4);
+ log.append(edit5);
+ log.append(edit6);
+ log.append(edit7);
+ log.append(edit8);
+ log.append(edit9);
+ log.append(edit10);
+ log.close();
+
+ // Start transaction manager
+ TransactionManager txm = new TransactionManager(conf, storage, new TxMetricsCollector());
+ txm.startAndWait();
+ try {
+ // Only wp4 should be in invalid list.
+ TransactionSnapshot snapshot = txm.getCurrentState();
+ assertEquals(ImmutableList.of(wp4), snapshot.getInvalid());
+ assertEquals(0, snapshot.getInProgress().size());
+ assertEquals(0, snapshot.getCommittedChangeSets().size());
+ assertEquals(0, snapshot.getCommittedChangeSets().size());
+ } finally {
+ txm.stopAndWait();
+ }
+ } finally {
+ if (storage != null) {
+ storage.stopAndWait();
+ }
+ }
+ }
+
+ /**
+ * Generates a new snapshot object with semi-randomly populated values. This does not necessarily accurately
+ * represent a typical snapshot's distribution of values, as we only set an upper bound on pointer values.
+ *
+ * We generate a new snapshot with the contents:
+ * <ul>
+ * <li>readPointer = 1M + (random % 1M)</li>
+ * <li>writePointer = readPointer + 1000</li>
+ * <li>waterMark = writePointer + 1000</li>
+ * <li>inProgress = one each for (writePointer - 500)..writePointer, ~ 5% "long" transaction</li>
+ * <li>invalid = 100 randomly distributed, 0..1M</li>
+ * <li>committing = one each, (readPointer + 1)..(readPointer + 100)</li>
+ * <li>committed = one each, (readPointer - 1000)..readPointer</li>
+ * </ul>
+ * @return a new snapshot of transaction state.
+ */
+ private TransactionSnapshot createRandomSnapshot() {
+ // limit readPointer to a reasonable range, but make it > 1M so we can assign enough keys below
+ long readPointer = (Math.abs(random.nextLong()) % 1000000L) + 1000000L;
+ long writePointer = readPointer + 1000L;
+
+ // generate in progress -- assume last 500 write pointer values
+ NavigableMap<Long, TransactionManager.InProgressTx> inProgress = Maps.newTreeMap();
+ long startPointer = writePointer - 500L;
+ for (int i = 0; i < 500; i++) {
+ long currentTime = System.currentTimeMillis();
+ // make some "long" transactions
+ if (i % 20 == 0) {
+ inProgress.put(startPointer + i,
+ new TransactionManager.InProgressTx(startPointer - 1, currentTime + TimeUnit.DAYS.toSeconds(1),
+ TransactionType.LONG));
+ } else {
+ inProgress.put(startPointer + i,
+ new TransactionManager.InProgressTx(startPointer - 1, currentTime + 300000L,
+ TransactionType.SHORT));
+ }
+ }
+
+ // make 100 random invalid IDs
+ LongArrayList invalid = new LongArrayList();
+ for (int i = 0; i < 100; i++) {
+ invalid.add(Math.abs(random.nextLong()) % 1000000L);
+ }
+
+ // make 100 committing entries, 10 keys each
+ Map<Long, Set<ChangeId>> committing = Maps.newHashMap();
+ for (int i = 0; i < 100; i++) {
+ committing.put(readPointer + i, generateChangeSet(10));
+ }
+
+ // make 1000 committed entries, 10 keys each
+ long startCommitted = readPointer - 1000L;
+ NavigableMap<Long, Set<ChangeId>> committed = Maps.newTreeMap();
+ for (int i = 0; i < 1000; i++) {
+ committed.put(startCommitted + i, generateChangeSet(10));
+ }
+
+ return new TransactionSnapshot(System.currentTimeMillis(), readPointer, writePointer,
+ invalid, inProgress, committing, committed);
+ }
+
+ private Set<ChangeId> generateChangeSet(int numEntries) {
+ Set<ChangeId> changes = Sets.newHashSet();
+ for (int i = 0; i < numEntries; i++) {
+ byte[] bytes = new byte[8];
+ random.nextBytes(bytes);
+ changes.add(new ChangeId(bytes));
+ }
+ return changes;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/persist/CommitMarkerCodecTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/persist/CommitMarkerCodecTest.java b/tephra-core/src/test/java/org/apache/tephra/persist/CommitMarkerCodecTest.java
new file mode 100644
index 0000000..22cdb59
--- /dev/null
+++ b/tephra-core/src/test/java/org/apache/tephra/persist/CommitMarkerCodecTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.persist;
+
+import com.google.common.primitives.Ints;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.tephra.TxConstants;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * Unit Test for {@link CommitMarkerCodec}.
+ */
+public class CommitMarkerCodecTest {
+
+ @ClassRule
+ public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
+ private static final String LOG_FILE = "txlog";
+ private static final Random RANDOM = new Random();
+
+ private static MiniDFSCluster dfsCluster;
+ private static Configuration conf;
+ private static FileSystem fs;
+
+ @BeforeClass
+ public static void setupBeforeClass() throws Exception {
+ Configuration hConf = new Configuration();
+ hConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TMP_FOLDER.newFolder().getAbsolutePath());
+
+ dfsCluster = new MiniDFSCluster.Builder(hConf).numDataNodes(1).build();
+ conf = new Configuration(dfsCluster.getFileSystem().getConf());
+ fs = FileSystem.newInstance(FileSystem.getDefaultUri(conf), conf);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ dfsCluster.shutdown();
+ }
+
+ @Test
+ public void testRandomCommitMarkers() throws Exception {
+ List<Integer> randomInts = new ArrayList<>();
+ Path newLog = new Path(TMP_FOLDER.newFolder().getAbsolutePath(), LOG_FILE);
+
+ // Write a bunch of random commit markers
+ try (SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, newLog, LongWritable.class,
+ LongWritable.class,
+ SequenceFile.CompressionType.NONE)) {
+ for (int i = 0; i < 1000; i++) {
+ int randomNum = RANDOM.nextInt(Integer.MAX_VALUE);
+ CommitMarkerCodec.writeMarker(writer, randomNum);
+ randomInts.add(randomNum);
+ }
+ writer.hflush();
+ writer.hsync();
+ }
+
+ // Read the commit markers back to verify the marker
+ try (SequenceFile.Reader reader = new SequenceFile.Reader(fs, newLog, conf);
+ CommitMarkerCodec markerCodec = new CommitMarkerCodec()) {
+ for (int num : randomInts) {
+ Assert.assertEquals(num, markerCodec.readMarker(reader));
+ }
+ }
+ }
+
+ private static class IncompleteValueBytes implements SequenceFile.ValueBytes {
+
+ @Override
+ public void writeUncompressedBytes(DataOutputStream outStream) throws IOException {
+ // don't write anything to simulate incomplete record
+ }
+
+ @Override
+ public void writeCompressedBytes(DataOutputStream outStream) throws IllegalArgumentException, IOException {
+ throw new IllegalArgumentException("Not possible");
+ }
+
+ @Override
+ public int getSize() {
+ return Ints.BYTES;
+ }
+ }
+
+ @Test
+ public void testIncompleteCommitMarker() throws Exception {
+ Path newLog = new Path(TMP_FOLDER.newFolder().getAbsolutePath(), LOG_FILE);
+ try (SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, newLog, LongWritable.class,
+ LongWritable.class,
+ SequenceFile.CompressionType.NONE)) {
+ String key = TxConstants.TransactionLog.NUM_ENTRIES_APPENDED;
+ SequenceFile.ValueBytes valueBytes = new IncompleteValueBytes();
+ writer.appendRaw(key.getBytes(), 0, key.length(), valueBytes);
+ writer.hflush();
+ writer.hsync();
+ }
+
+ // Read the incomplete commit marker
+ try (SequenceFile.Reader reader = new SequenceFile.Reader(fs, newLog, conf);
+ CommitMarkerCodec markerCodec = new CommitMarkerCodec()) {
+ try {
+ markerCodec.readMarker(reader);
+ Assert.fail("Expected EOF Exception to be thrown");
+ } catch (EOFException e) {
+ // expected since we didn't write the value bytes
+ }
+ }
+ }
+
+ @Test
+ public void testIncorrectCommitMarker() throws Exception {
+ Path newLog = new Path(TMP_FOLDER.newFolder().getAbsolutePath(), LOG_FILE);
+
+ // Write an incorrect marker
+ try (SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, newLog, LongWritable.class,
+ LongWritable.class,
+ SequenceFile.CompressionType.NONE)) {
+ String invalidKey = "IncorrectKey";
+ SequenceFile.ValueBytes valueBytes = new CommitMarkerCodec.CommitEntriesCount(100);
+ writer.appendRaw(invalidKey.getBytes(), 0, invalidKey.length(), valueBytes);
+ writer.hflush();
+ writer.hsync();
+ }
+
+ // Read the commit markers back to verify the marker
+ try (SequenceFile.Reader reader = new SequenceFile.Reader(fs, newLog, conf);
+ CommitMarkerCodec markerCodec = new CommitMarkerCodec()) {
+ try {
+ markerCodec.readMarker(reader);
+ Assert.fail("Expected an IOException to be thrown");
+ } catch (IOException e) {
+ // expected
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/persist/HDFSTransactionLogTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/persist/HDFSTransactionLogTest.java b/tephra-core/src/test/java/org/apache/tephra/persist/HDFSTransactionLogTest.java
new file mode 100644
index 0000000..7b9f06b
--- /dev/null
+++ b/tephra-core/src/test/java/org/apache/tephra/persist/HDFSTransactionLogTest.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.persist;
+
+import com.google.common.io.Closeables;
+import com.google.common.primitives.Longs;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.metrics.MetricsCollector;
+import org.apache.tephra.metrics.TxMetricsCollector;
+import org.apache.tephra.util.TransactionEditUtil;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+/**
+ * Testing for complete and partial sycs of {@link TransactionEdit} to {@link HDFSTransactionLog}
+ */
+public class HDFSTransactionLogTest {
+ @ClassRule
+ public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
+ private static final String LOG_FILE_PREFIX = "txlog.";
+
+ private static MiniDFSCluster dfsCluster;
+ private static Configuration conf;
+ private static MetricsCollector metricsCollector;
+
+ @BeforeClass
+ public static void setupBeforeClass() throws Exception {
+ Configuration hConf = new Configuration();
+ hConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TMP_FOLDER.newFolder().getAbsolutePath());
+
+ dfsCluster = new MiniDFSCluster.Builder(hConf).numDataNodes(1).build();
+ conf = new Configuration(dfsCluster.getFileSystem().getConf());
+ metricsCollector = new TxMetricsCollector();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ dfsCluster.shutdown();
+ }
+
+ private Configuration getConfiguration() throws IOException {
+ // tests should use the current user for HDFS
+ conf.unset(TxConstants.Manager.CFG_TX_HDFS_USER);
+ conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, TMP_FOLDER.newFolder().getAbsolutePath());
+ return conf;
+ }
+
+ private HDFSTransactionLog getHDFSTransactionLog(Configuration conf,
+ FileSystem fs, long timeInMillis) throws Exception {
+ String snapshotDir = conf.get(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR);
+ Path newLog = new Path(snapshotDir, LOG_FILE_PREFIX + timeInMillis);
+ return new HDFSTransactionLog(fs, conf, newLog, timeInMillis, metricsCollector);
+ }
+
+ private SequenceFile.Writer getSequenceFileWriter(Configuration configuration, FileSystem fs,
+ long timeInMillis, boolean withMarker) throws IOException {
+ String snapshotDir = configuration.get(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR);
+ Path newLog = new Path(snapshotDir, LOG_FILE_PREFIX + timeInMillis);
+ SequenceFile.Metadata metadata = new SequenceFile.Metadata();
+ if (withMarker) {
+ metadata.set(new Text(TxConstants.TransactionLog.VERSION_KEY),
+ new Text(Byte.toString(TxConstants.TransactionLog.CURRENT_VERSION)));
+ }
+ return SequenceFile.createWriter(fs, configuration, newLog, LongWritable.class,
+ TransactionEdit.class, SequenceFile.CompressionType.NONE, null, null, metadata);
+ }
+
+ private void writeNumWrites(SequenceFile.Writer writer, final int size) throws Exception {
+ String key = TxConstants.TransactionLog.NUM_ENTRIES_APPENDED;
+ CommitMarkerCodec.writeMarker(writer, size);
+ }
+
+ private void testTransactionLogSync(int totalCount, int batchSize, boolean withMarker, boolean isComplete)
+ throws Exception {
+ List<TransactionEdit> edits = TransactionEditUtil.createRandomEdits(totalCount);
+ long timestamp = System.currentTimeMillis();
+ Configuration configuration = getConfiguration();
+ FileSystem fs = FileSystem.newInstance(FileSystem.getDefaultUri(configuration), configuration);
+ SequenceFile.Writer writer = getSequenceFileWriter(configuration, fs, timestamp, withMarker);
+ AtomicLong logSequence = new AtomicLong();
+ HDFSTransactionLog transactionLog = getHDFSTransactionLog(configuration, fs, timestamp);
+ AbstractTransactionLog.Entry entry;
+
+ for (int i = 0; i < totalCount - batchSize; i += batchSize) {
+ if (withMarker) {
+ writeNumWrites(writer, batchSize);
+ }
+ for (int j = 0; j < batchSize; j++) {
+ entry = new AbstractTransactionLog.Entry(new LongWritable(logSequence.getAndIncrement()), edits.get(j));
+ writer.append(entry.getKey(), entry.getEdit());
+ }
+ writer.syncFs();
+ }
+
+ if (withMarker) {
+ writeNumWrites(writer, batchSize);
+ }
+
+ for (int i = totalCount - batchSize; i < totalCount - 1; i++) {
+ entry = new AbstractTransactionLog.Entry(new LongWritable(logSequence.getAndIncrement()), edits.get(i));
+ writer.append(entry.getKey(), entry.getEdit());
+ }
+
+ entry = new AbstractTransactionLog.Entry(new LongWritable(logSequence.getAndIncrement()),
+ edits.get(totalCount - 1));
+ if (isComplete) {
+ writer.append(entry.getKey(), entry.getEdit());
+ } else {
+ byte[] bytes = Longs.toByteArray(entry.getKey().get());
+ writer.appendRaw(bytes, 0, bytes.length, new SequenceFile.ValueBytes() {
+ @Override
+ public void writeUncompressedBytes(DataOutputStream outStream) throws IOException {
+ byte[] test = new byte[]{0x2};
+ outStream.write(test, 0, 1);
+ }
+
+ @Override
+ public void writeCompressedBytes(DataOutputStream outStream) throws IllegalArgumentException, IOException {
+ // no-op
+ }
+
+ @Override
+ public int getSize() {
+ // mimic size longer than the actual byte array size written, so we would reach EOF
+ return 12;
+ }
+ });
+ }
+ writer.syncFs();
+ Closeables.closeQuietly(writer);
+
+ // now let's try to read this log
+ TransactionLogReader reader = transactionLog.getReader();
+ int syncedEdits = 0;
+ while (reader.next() != null) {
+ // testing reading the transaction edits
+ syncedEdits++;
+ }
+ if (isComplete) {
+ Assert.assertEquals(totalCount, syncedEdits);
+ } else {
+ Assert.assertEquals(totalCount - batchSize, syncedEdits);
+ }
+ }
+
+ @Test
+ public void testTransactionLogNewVersion() throws Exception {
+ // in-complete sync
+ testTransactionLogSync(1000, 1, true, false);
+ testTransactionLogSync(2000, 5, true, false);
+
+ // complete sync
+ testTransactionLogSync(1000, 1, true, true);
+ testTransactionLogSync(2000, 5, true, true);
+ }
+
+ @Test
+ public void testTransactionLogOldVersion() throws Exception {
+ // in-complete sync
+ testTransactionLogSync(1000, 1, false, false);
+
+ // complete sync
+ testTransactionLogSync(2000, 5, false, true);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/persist/HDFSTransactionStateStorageTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/persist/HDFSTransactionStateStorageTest.java b/tephra-core/src/test/java/org/apache/tephra/persist/HDFSTransactionStateStorageTest.java
new file mode 100644
index 0000000..f0eb9e5
--- /dev/null
+++ b/tephra-core/src/test/java/org/apache/tephra/persist/HDFSTransactionStateStorageTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.persist;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.metrics.TxMetricsCollector;
+import org.apache.tephra.snapshot.SnapshotCodecProvider;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+
+
+/**
+ * Tests persistence of transaction snapshots and write-ahead logs to HDFS storage, using the
+ * {@link HDFSTransactionStateStorage} and {@link HDFSTransactionLog} implementations.
+ */
+public class HDFSTransactionStateStorageTest extends AbstractTransactionStateStorageTest {
+
+ @ClassRule
+ public static TemporaryFolder tmpFolder = new TemporaryFolder();
+
+ private static MiniDFSCluster dfsCluster;
+ private static Configuration conf;
+
+ @BeforeClass
+ public static void setupBeforeClass() throws Exception {
+ Configuration hConf = new Configuration();
+ hConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tmpFolder.newFolder().getAbsolutePath());
+
+ dfsCluster = new MiniDFSCluster.Builder(hConf).numDataNodes(1).build();
+ conf = new Configuration(dfsCluster.getFileSystem().getConf());
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ dfsCluster.shutdown();
+ }
+
+ @Override
+ protected Configuration getConfiguration(String testName) throws IOException {
+ // tests should use the current user for HDFS
+ conf.unset(TxConstants.Manager.CFG_TX_HDFS_USER);
+ conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath());
+ return conf;
+ }
+
+ @Override
+ protected AbstractTransactionStateStorage getStorage(Configuration conf) {
+ return new HDFSTransactionStateStorage(conf, new SnapshotCodecProvider(conf), new TxMetricsCollector());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/persist/InMemoryTransactionStateStorage.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/persist/InMemoryTransactionStateStorage.java b/tephra-core/src/test/java/org/apache/tephra/persist/InMemoryTransactionStateStorage.java
new file mode 100644
index 0000000..366542b
--- /dev/null
+++ b/tephra-core/src/test/java/org/apache/tephra/persist/InMemoryTransactionStateStorage.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.persist;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.AbstractIdleService;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import javax.annotation.Nullable;
+
+/**
+ * Stores the latest transaction snapshot and logs in memory.
+ */
+public class InMemoryTransactionStateStorage extends AbstractIdleService implements TransactionStateStorage {
+ // only keeps the most recent snapshot in memory
+ private TransactionSnapshot lastSnapshot;
+
+ private NavigableMap<Long, TransactionLog> logs = new TreeMap<>();
+
+ @Override
+ protected void startUp() throws Exception {
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ lastSnapshot = null;
+ logs = new TreeMap<>();
+ }
+
+ @Override
+ public void writeSnapshot(OutputStream out, TransactionSnapshot snapshot) throws IOException {
+ // no codecs in in-memory mode
+ }
+
+ @Override
+ public void writeSnapshot(TransactionSnapshot snapshot) throws IOException {
+ lastSnapshot = snapshot;
+ }
+
+ @Override
+ public TransactionSnapshot getLatestSnapshot() throws IOException {
+ return lastSnapshot;
+ }
+
+ @Override
+ public TransactionVisibilityState getLatestTransactionVisibilityState() throws IOException {
+ return lastSnapshot;
+ }
+
+ @Override
+ public long deleteOldSnapshots(int numberToKeep) throws IOException {
+ // always only keep the last snapshot
+ return lastSnapshot.getTimestamp();
+ }
+
+ @Override
+ public List<String> listSnapshots() throws IOException {
+ List<String> snapshots = Lists.newArrayListWithCapacity(1);
+ if (lastSnapshot != null) {
+ snapshots.add(Long.toString(lastSnapshot.getTimestamp()));
+ }
+ return snapshots;
+ }
+
+ @Override
+ public List<TransactionLog> getLogsSince(long timestamp) throws IOException {
+ return Lists.newArrayList(logs.tailMap(timestamp).values());
+ }
+
+ @Override
+ public TransactionLog createLog(long timestamp) throws IOException {
+ TransactionLog log = new InMemoryTransactionLog(timestamp);
+ logs.put(timestamp, log);
+ return log;
+ }
+
+ @Override
+ public void deleteLogsOlderThan(long timestamp) throws IOException {
+ Iterator<Map.Entry<Long, TransactionLog>> logIter = logs.entrySet().iterator();
+ while (logIter.hasNext()) {
+ Map.Entry<Long, TransactionLog> logEntry = logIter.next();
+ if (logEntry.getKey() < timestamp) {
+ logIter.remove();
+ }
+ }
+ }
+
+ @Override
+ public void setupStorage() throws IOException {
+ }
+
+ @Override
+ public List<String> listLogs() throws IOException {
+ return Lists.transform(Lists.newArrayList(logs.keySet()), new Function<Long, String>() {
+ @Nullable
+ @Override
+ public String apply(@Nullable Long input) {
+ return input.toString();
+ }
+ });
+ }
+
+ @Override
+ public String getLocation() {
+ return "in-memory";
+ }
+
+ public static class InMemoryTransactionLog implements TransactionLog {
+ private long timestamp;
+ private List<TransactionEdit> edits = Lists.newArrayList();
+ boolean isClosed = false;
+ public InMemoryTransactionLog(long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ @Override
+ public String getName() {
+ return "in-memory@" + timestamp;
+ }
+
+ @Override
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ @Override
+ public void append(TransactionEdit edit) throws IOException {
+ if (isClosed) {
+ throw new IOException("Log is closed");
+ }
+ edits.add(edit);
+ }
+
+ @Override
+ public void append(List<TransactionEdit> edits) throws IOException {
+ if (isClosed) {
+ throw new IOException("Log is closed");
+ }
+ edits.addAll(edits);
+ }
+
+ @Override
+ public void close() {
+ isClosed = true;
+ }
+
+ @Override
+ public TransactionLogReader getReader() throws IOException {
+ return new InMemoryLogReader(edits.iterator());
+ }
+ }
+
+ public static class InMemoryLogReader implements TransactionLogReader {
+ private final Iterator<TransactionEdit> editIterator;
+
+ public InMemoryLogReader(Iterator<TransactionEdit> editIterator) {
+ this.editIterator = editIterator;
+ }
+
+ @Override
+ public TransactionEdit next() throws IOException {
+ if (editIterator.hasNext()) {
+ return editIterator.next();
+ }
+ return null;
+ }
+
+ @Override
+ public TransactionEdit next(TransactionEdit reuse) throws IOException {
+ return next();
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/persist/LocalTransactionStateStorageTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/persist/LocalTransactionStateStorageTest.java b/tephra-core/src/test/java/org/apache/tephra/persist/LocalTransactionStateStorageTest.java
new file mode 100644
index 0000000..9535102
--- /dev/null
+++ b/tephra-core/src/test/java/org/apache/tephra/persist/LocalTransactionStateStorageTest.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.persist;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSortedSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tephra.ChangeId;
+import org.apache.tephra.TransactionManager;
+import org.apache.tephra.TransactionType;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.metrics.TxMetricsCollector;
+import org.apache.tephra.snapshot.DefaultSnapshotCodec;
+import org.apache.tephra.snapshot.SnapshotCodecProvider;
+import org.apache.tephra.snapshot.SnapshotCodecV4;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.DataOutput;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Runs transaction persistence tests against the {@link LocalFileTransactionStateStorage} and
+ * {@link LocalFileTransactionLog} implementations.
+ */
+public class LocalTransactionStateStorageTest extends AbstractTransactionStateStorageTest {
+ @ClassRule
+ public static TemporaryFolder tmpDir = new TemporaryFolder();
+
+ @Override
+ protected Configuration getConfiguration(String testName) throws IOException {
+ File testDir = tmpDir.newFolder(testName);
+ Configuration conf = new Configuration();
+ conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_LOCAL_DIR, testDir.getAbsolutePath());
+ conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, SnapshotCodecV4.class.getName());
+ return conf;
+ }
+
+ @Override
+ protected AbstractTransactionStateStorage getStorage(Configuration conf) {
+ return new LocalFileTransactionStateStorage(conf, new SnapshotCodecProvider(conf), new TxMetricsCollector());
+ }
+
+ // v2 TransactionEdit
+ @SuppressWarnings("deprecation")
+ private class TransactionEditV2 extends TransactionEdit {
+ public TransactionEditV2(long writePointer, long visibilityUpperBound, State state, long expirationDate,
+ Set<ChangeId> changes, long commitPointer, boolean canCommit, TransactionType type) {
+ super(writePointer, visibilityUpperBound, state, expirationDate, changes, commitPointer, canCommit, type,
+ null, 0L, 0L, null);
+ }
+ @Override
+ public void write(DataOutput out) throws IOException {
+ TransactionEditCodecs.encode(this, out, new TransactionEditCodecs.TransactionEditCodecV2());
+ }
+ }
+
+ // Note: this test cannot run in AbstractTransactionStateStorageTest, since SequenceFile throws exception saying
+ // TransactionEditV2 is not TransactionEdit. Since the code path this test is verifying is the same path between
+ // HDFS and Local Storage, having this only over here is fine.
+ @SuppressWarnings("deprecation")
+ @Test
+ public void testLongTxnBackwardsCompatibility() throws Exception {
+ Configuration conf = getConfiguration("testLongTxnBackwardsCompatibility");
+
+ // Use SnapshotCodec version 1
+ String latestSnapshotCodec = conf.get(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES);
+ conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, DefaultSnapshotCodec.class.getName());
+
+ TransactionStateStorage storage = null;
+ try {
+ storage = getStorage(conf);
+ storage.startAndWait();
+
+ // Create transaction snapshot and transaction edits with version when long running txns had -1 expiration.
+ Collection<Long> invalid = Lists.newArrayList();
+ NavigableMap<Long, TransactionManager.InProgressTx> inProgress = Maps.newTreeMap();
+ long time1 = System.currentTimeMillis();
+ long wp1 = time1 * TxConstants.MAX_TX_PER_MS;
+ inProgress.put(wp1, new TransactionManager.InProgressTx(wp1 - 5, -1L));
+ long time2 = time1 + 100;
+ long wp2 = time2 * TxConstants.MAX_TX_PER_MS;
+ inProgress.put(wp2, new TransactionManager.InProgressTx(wp2 - 50, time2 + 1000));
+ Map<Long, Set<ChangeId>> committing = Maps.newHashMap();
+ Map<Long, Set<ChangeId>> committed = Maps.newHashMap();
+ TransactionSnapshot snapshot = new TransactionSnapshot(time2, 0, wp2, invalid,
+ inProgress, committing, committed);
+ long time3 = time1 + 200;
+ long wp3 = time3 * TxConstants.MAX_TX_PER_MS;
+ TransactionEdit edit1 = new TransactionEditV2(wp3, wp3 - 10, TransactionEdit.State.INPROGRESS, -1L,
+ null, 0L, false, null);
+ long time4 = time1 + 300;
+ long wp4 = time4 * TxConstants.MAX_TX_PER_MS;
+ TransactionEdit edit2 = new TransactionEditV2(wp4, wp4 - 10, TransactionEdit.State.INPROGRESS, time4 + 1000,
+ null, 0L, false, null);
+
+ // write snapshot and transaction edit
+ storage.writeSnapshot(snapshot);
+ TransactionLog log = storage.createLog(time2);
+ log.append(edit1);
+ log.append(edit2);
+ log.close();
+
+ // Start transaction manager
+ conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, latestSnapshotCodec);
+ long longTimeout = TimeUnit.SECONDS.toMillis(conf.getLong(TxConstants.Manager.CFG_TX_LONG_TIMEOUT,
+ TxConstants.Manager.DEFAULT_TX_LONG_TIMEOUT));
+ TransactionManager txm = new TransactionManager(conf, storage, new TxMetricsCollector());
+ txm.startAndWait();
+ try {
+ // Verify that the txns in old format were read correctly.
+ // There should be four in-progress transactions, and no invalid transactions
+ TransactionSnapshot snapshot1 = txm.getCurrentState();
+ Assert.assertEquals(ImmutableSortedSet.of(wp1, wp2, wp3, wp4), snapshot1.getInProgress().keySet());
+ verifyInProgress(snapshot1.getInProgress().get(wp1), TransactionType.LONG, time1 + longTimeout);
+ verifyInProgress(snapshot1.getInProgress().get(wp2), TransactionType.SHORT, time2 + 1000);
+ verifyInProgress(snapshot1.getInProgress().get(wp3), TransactionType.LONG, time3 + longTimeout);
+ verifyInProgress(snapshot1.getInProgress().get(wp4), TransactionType.SHORT, time4 + 1000);
+ Assert.assertEquals(0, snapshot1.getInvalid().size());
+ } finally {
+ txm.stopAndWait();
+ }
+ } finally {
+ if (storage != null) {
+ storage.stopAndWait();
+ }
+ }
+ }
+
+ // Note: this test cannot run in AbstractTransactionStateStorageTest, since SequenceFile throws exception saying
+ // TransactionEditV2 is not TransactionEdit. Since the code path this test is verifying is the same path between
+ // HDFS and Local Storage, having this only over here is fine.
+ @SuppressWarnings("deprecation")
+ @Test
+ public void testAbortEditBackwardsCompatibility() throws Exception {
+ Configuration conf = getConfiguration("testAbortEditBackwardsCompatibility");
+
+ TransactionStateStorage storage = null;
+ try {
+ storage = getStorage(conf);
+ storage.startAndWait();
+
+ // Create edits for transaction type addition to abort
+ long time1 = System.currentTimeMillis();
+ long wp1 = time1 * TxConstants.MAX_TX_PER_MS;
+ TransactionEdit edit1 = new TransactionEditV2(wp1, wp1 - 10, TransactionEdit.State.INPROGRESS, -1L,
+ null, 0L, false, null);
+ TransactionEdit edit2 = new TransactionEditV2(wp1, 0L, TransactionEdit.State.ABORTED, 0L,
+ null, 0L, false, null);
+
+ long time2 = time1 + 400;
+ long wp2 = time2 * TxConstants.MAX_TX_PER_MS;
+ TransactionEdit edit3 = new TransactionEditV2(wp2, wp2 - 10, TransactionEdit.State.INPROGRESS, time2 + 10000,
+ null, 0L, false, null);
+ TransactionEdit edit4 = new TransactionEditV2(wp2, 0L, TransactionEdit.State.INVALID, 0L, null, 0L, false, null);
+ // Simulate case where we cannot determine txn state during abort
+ TransactionEdit edit5 = new TransactionEditV2(wp2, 0L, TransactionEdit.State.ABORTED, 0L, null, 0L, false, null);
+
+ // write snapshot and transaction edit
+ TransactionLog log = storage.createLog(time1);
+ log.append(edit1);
+ log.append(edit2);
+ log.append(edit3);
+ log.append(edit4);
+ log.append(edit5);
+ log.close();
+
+ // Start transaction manager
+ TransactionManager txm = new TransactionManager(conf, storage, new TxMetricsCollector());
+ txm.startAndWait();
+ try {
+ // Verify that the txns in old format were read correctly.
+ // Both transactions should be in invalid state
+ TransactionSnapshot snapshot1 = txm.getCurrentState();
+ Assert.assertEquals(ImmutableList.of(wp1, wp2), snapshot1.getInvalid());
+ Assert.assertEquals(0, snapshot1.getInProgress().size());
+ Assert.assertEquals(0, snapshot1.getCommittedChangeSets().size());
+ Assert.assertEquals(0, snapshot1.getCommittingChangeSets().size());
+ } finally {
+ txm.stopAndWait();
+ }
+ } finally {
+ if (storage != null) {
+ storage.stopAndWait();
+ }
+ }
+ }
+
+ private void verifyInProgress(TransactionManager.InProgressTx inProgressTx, TransactionType type,
+ long expiration) throws Exception {
+ Assert.assertEquals(type, inProgressTx.getType());
+ Assert.assertTrue(inProgressTx.getExpiration() == expiration);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/persist/TransactionEditTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/persist/TransactionEditTest.java b/tephra-core/src/test/java/org/apache/tephra/persist/TransactionEditTest.java
new file mode 100644
index 0000000..46cb81c
--- /dev/null
+++ b/tephra-core/src/test/java/org/apache/tephra/persist/TransactionEditTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.persist;
+
+import com.google.common.collect.Sets;
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
+import org.apache.tephra.ChangeId;
+import org.apache.tephra.TransactionType;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.DataInput;
+import java.io.IOException;
+
+/**
+ * test for {@link TransactionEdit}
+ */
+public class TransactionEditTest {
+ private static final byte[] COL = new byte[] {'c'};
+
+ @Test
+ public void testV1SerdeCompat() throws Exception {
+ TransactionEditCodecs.TransactionEditCodec olderCodec = new TransactionEditCodecs.TransactionEditCodecV1();
+ // start tx edit and committed tx edit cover all fields of tx edit
+ // NOTE: set visibilityUpperBound to 0 and transaction type to null as this is expected default
+ // for decoding older versions that doesn't store it
+ verifyDecodingSupportsOlderVersion(TransactionEdit.createStarted(2L, 0L, 1000L, null), olderCodec);
+ verifyDecodingSupportsOlderVersion(
+ TransactionEdit.createCommitted(2L, Sets.newHashSet(new ChangeId(COL)), 3L, true), olderCodec);
+ }
+
+ @Test
+ public void testV2SerdeCompat() throws Exception {
+ TransactionEditCodecs.TransactionEditCodec olderCodec = new TransactionEditCodecs.TransactionEditCodecV2();
+ // start tx edit and committed tx edit cover all fields of tx edit
+ // NOTE: transaction type to null as this is expected default for decoding older versions that doesn't store it
+ verifyDecodingSupportsOlderVersion(TransactionEdit.createStarted(2L, 100L, 1000L, null), olderCodec);
+ verifyDecodingSupportsOlderVersion(
+ TransactionEdit.createCommitted(2L, Sets.newHashSet(new ChangeId(COL)), 3L, true), olderCodec);
+ }
+
+ @SuppressWarnings("deprecation")
+ private void verifyDecodingSupportsOlderVersion(TransactionEdit edit,
+ TransactionEditCodecs.TransactionEditCodec olderCodec)
+ throws IOException {
+ // encoding with older version of codec
+ ByteArrayDataOutput out = ByteStreams.newDataOutput();
+ TransactionEditCodecs.encode(edit, out, olderCodec);
+
+ // decoding
+ TransactionEdit decodedEdit = new TransactionEdit();
+ DataInput in = ByteStreams.newDataInput(out.toByteArray());
+ decodedEdit.readFields(in);
+
+ Assert.assertEquals(edit, decodedEdit);
+ }
+
+ @Test
+ public void testSerialization() throws Exception {
+ assertSerializedEdit(TransactionEdit.createAborted(1L, TransactionType.SHORT, new long[0]));
+ assertSerializedEdit(TransactionEdit.createAborted(1L, TransactionType.SHORT, new long[]{ 2L, 3L }));
+ assertSerializedEdit(TransactionEdit.createAborted(1L, TransactionType.LONG, new long[0]));
+ assertSerializedEdit(TransactionEdit.createAborted(1L, TransactionType.LONG, new long[]{ 2L, 3L }));
+
+ assertSerializedEdit(TransactionEdit.createCheckpoint(2L, 1L));
+
+ assertSerializedEdit(TransactionEdit.createCommitted(1L, Sets.<ChangeId>newHashSet(), 2L, false));
+ assertSerializedEdit(TransactionEdit.createCommitted(1L, Sets.<ChangeId>newHashSet(), 2L, true));
+ assertSerializedEdit(TransactionEdit.createCommitted(1L,
+ Sets.newHashSet(new ChangeId(new byte[]{'a', 'b', 'c'})), 2L, false));
+ assertSerializedEdit(TransactionEdit.createCommitted(1L,
+ Sets.newHashSet(new ChangeId(new byte[]{ 'a', 'b', 'c' }), new ChangeId(new byte[]{ 'd', 'e', 'f' })),
+ 2L, true));
+
+ assertSerializedEdit(TransactionEdit.createCommitting(1L, Sets.<ChangeId>newHashSet()));
+ assertSerializedEdit(TransactionEdit.createCommitting(1L,
+ Sets.newHashSet(new ChangeId(new byte[]{'a', 'b', 'c'}))));
+ assertSerializedEdit(TransactionEdit.createCommitting(1L,
+ Sets.newHashSet(new ChangeId(new byte[]{'a', 'b', 'c'}), new ChangeId(new byte[]{'d', 'e', 'f'}))));
+
+ assertSerializedEdit(TransactionEdit.createInvalid(1L));
+
+ assertSerializedEdit(TransactionEdit.createMoveWatermark(10L));
+
+ assertSerializedEdit(TransactionEdit.createStarted(2L, 1L, System.currentTimeMillis() + 1000,
+ TransactionType.SHORT));
+ assertSerializedEdit(TransactionEdit.createStarted(2L, 1L, System.currentTimeMillis() + 10000,
+ TransactionType.LONG));
+
+ assertSerializedEdit(TransactionEdit.createTruncateInvalidTx(Sets.newHashSet(new Long(1))));
+ assertSerializedEdit(TransactionEdit.createTruncateInvalidTx(
+ Sets.newHashSet(new Long(1), new Long(2), new Long(3))));
+
+ assertSerializedEdit(TransactionEdit.createTruncateInvalidTxBefore(System.currentTimeMillis()));
+ }
+
+ private void assertSerializedEdit(TransactionEdit originalEdit) throws IOException {
+ ByteArrayDataOutput out = ByteStreams.newDataOutput();
+ originalEdit.write(out);
+
+ TransactionEdit decodedEdit = new TransactionEdit();
+ DataInput in = ByteStreams.newDataInput(out.toByteArray());
+ decodedEdit.readFields(in);
+
+ Assert.assertEquals(originalEdit, decodedEdit);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java b/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java
new file mode 100644
index 0000000..afdff5c
--- /dev/null
+++ b/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.snapshot;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSortedMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tephra.ChangeId;
+import org.apache.tephra.Transaction;
+import org.apache.tephra.TransactionManager;
+import org.apache.tephra.TransactionNotInProgressException;
+import org.apache.tephra.TransactionType;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.persist.TransactionSnapshot;
+import org.apache.tephra.persist.TransactionStateStorage;
+import org.apache.tephra.persist.TransactionVisibilityState;
+import org.apache.tephra.runtime.ConfigModule;
+import org.apache.tephra.runtime.DiscoveryModules;
+import org.apache.tephra.runtime.TransactionModules;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests related to {@link SnapshotCodec} implementations.
+ */
+public class SnapshotCodecTest {
+ @ClassRule
+ public static TemporaryFolder tmpDir = new TemporaryFolder();
+
+ @Test
+ public void testMinimalDeserilization() throws Exception {
+ long now = System.currentTimeMillis();
+ long nowWritePointer = now * TxConstants.MAX_TX_PER_MS;
+ /*
+ * Snapshot consisting of transactions at:
+ */
+ long tInvalid = nowWritePointer - 5; // t1 - invalid
+ long readPtr = nowWritePointer - 4; // t2 - here and earlier committed
+ long tLong = nowWritePointer - 3; // t3 - in-progress LONG
+ long tCommitted = nowWritePointer - 2; // t4 - committed, changeset (r1, r2)
+ long tShort = nowWritePointer - 1; // t5 - in-progress SHORT, canCommit called, changeset (r3, r4)
+
+ TreeMap<Long, TransactionManager.InProgressTx> inProgress = Maps.newTreeMap(ImmutableSortedMap.of(
+ tLong, new TransactionManager.InProgressTx(readPtr,
+ TransactionManager.getTxExpirationFromWritePointer(
+ tLong, TxConstants.Manager.DEFAULT_TX_LONG_TIMEOUT),
+ TransactionType.LONG),
+ tShort, new TransactionManager.InProgressTx(readPtr, now + 1000, TransactionType.SHORT)));
+
+ TransactionSnapshot snapshot = new TransactionSnapshot(now, readPtr, nowWritePointer,
+ Lists.newArrayList(tInvalid), // invalid
+ inProgress, ImmutableMap.<Long, Set<ChangeId>>of(
+ tShort, Sets.<ChangeId>newHashSet()),
+ ImmutableMap.<Long, Set<ChangeId>>of(
+ tCommitted, Sets.<ChangeId>newHashSet()));
+
+ Configuration conf1 = new Configuration();
+ conf1.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, SnapshotCodecV4.class.getName());
+ SnapshotCodecProvider provider1 = new SnapshotCodecProvider(conf1);
+
+ byte[] byteArray;
+ try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+ provider1.encode(out, snapshot);
+ byteArray = out.toByteArray();
+ }
+
+ // TransactionSnapshot and TransactionVisibilityState decode should pass now
+ TransactionSnapshot txSnapshot = provider1.decode(new ByteArrayInputStream(byteArray));
+ TransactionVisibilityState txVisibilityState =
+ provider1.decodeTransactionVisibilityState(new ByteArrayInputStream(byteArray));
+ assertTransactionVisibilityStateEquals(txSnapshot, txVisibilityState);
+
+ // Corrupt the serialization byte array so that full deserialization will fail
+ byteArray[byteArray.length - 1] = 'a';
+
+ // TransactionVisibilityState decoding should pass since it doesn't decode the committing and committed changesets.
+ TransactionVisibilityState txVisibilityState2 = provider1.decodeTransactionVisibilityState(
+ new ByteArrayInputStream(byteArray));
+ Assert.assertNotNull(txVisibilityState2);
+ Assert.assertEquals(txVisibilityState, txVisibilityState2);
+ Assert.assertEquals(readPtr, txVisibilityState2.getReadPointer());
+ try {
+ provider1.decode(new ByteArrayInputStream(byteArray));
+ Assert.fail();
+ } catch (RuntimeException e) {
+ // expected since we modified the serialization bytes
+ }
+ }
+
+ /**
+ * In-progress LONG transactions written with DefaultSnapshotCodec will not have the type serialized as part of
+ * the data. Since these transactions also contain a non-negative expiration, we need to ensure we reset the type
+ * correctly when the snapshot is loaded.
+ */
+ @Test
+ public void testDefaultToV3Compatibility() throws Exception {
+ long now = System.currentTimeMillis();
+ long nowWritePointer = now * TxConstants.MAX_TX_PER_MS;
+ /*
+ * Snapshot consisting of transactions at:
+ */
+ long tInvalid = nowWritePointer - 5; // t1 - invalid
+ long readPtr = nowWritePointer - 4; // t2 - here and earlier committed
+ long tLong = nowWritePointer - 3; // t3 - in-progress LONG
+ long tCommitted = nowWritePointer - 2; // t4 - committed, changeset (r1, r2)
+ long tShort = nowWritePointer - 1; // t5 - in-progress SHORT, canCommit called, changeset (r3, r4)
+
+ TreeMap<Long, TransactionManager.InProgressTx> inProgress = Maps.newTreeMap(ImmutableSortedMap.of(
+ tLong, new TransactionManager.InProgressTx(readPtr,
+ TransactionManager.getTxExpirationFromWritePointer(tLong, TxConstants.Manager.DEFAULT_TX_LONG_TIMEOUT),
+ TransactionType.LONG),
+ tShort, new TransactionManager.InProgressTx(readPtr, now + 1000, TransactionType.SHORT)));
+
+ TransactionSnapshot snapshot = new TransactionSnapshot(now, readPtr, nowWritePointer,
+ Lists.newArrayList(tInvalid), // invalid
+ inProgress,
+ ImmutableMap.<Long, Set<ChangeId>>of(
+ tShort, Sets.newHashSet(new ChangeId(new byte[]{'r', '3'}), new ChangeId(new byte[]{'r', '4'}))),
+ ImmutableMap.<Long, Set<ChangeId>>of(
+ tCommitted, Sets.newHashSet(new ChangeId(new byte[]{'r', '1'}), new ChangeId(new byte[]{'r', '2'}))));
+
+ Configuration conf1 = new Configuration();
+ conf1.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, DefaultSnapshotCodec.class.getName());
+ SnapshotCodecProvider provider1 = new SnapshotCodecProvider(conf1);
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ try {
+ provider1.encode(out, snapshot);
+ } finally {
+ out.close();
+ }
+
+ TransactionSnapshot snapshot2 = provider1.decode(new ByteArrayInputStream(out.toByteArray()));
+ TransactionVisibilityState minTxSnapshot = provider1.decodeTransactionVisibilityState(
+ new ByteArrayInputStream(out.toByteArray()));
+ assertTransactionVisibilityStateEquals(snapshot2, minTxSnapshot);
+
+ assertEquals(snapshot.getReadPointer(), snapshot2.getReadPointer());
+ assertEquals(snapshot.getWritePointer(), snapshot2.getWritePointer());
+ assertEquals(snapshot.getInvalid(), snapshot2.getInvalid());
+ // in-progress transactions will have missing types
+ assertNotEquals(snapshot.getInProgress(), snapshot2.getInProgress());
+ assertEquals(snapshot.getCommittingChangeSets(), snapshot2.getCommittingChangeSets());
+ assertEquals(snapshot.getCommittedChangeSets(), snapshot2.getCommittedChangeSets());
+
+ // after fixing in-progress, full snapshot should match
+ Map<Long, TransactionManager.InProgressTx> fixedInProgress = TransactionManager.txnBackwardsCompatCheck(
+ TxConstants.Manager.DEFAULT_TX_LONG_TIMEOUT, 10000L, snapshot2.getInProgress());
+ assertEquals(snapshot.getInProgress(), fixedInProgress);
+ assertEquals(snapshot, snapshot2);
+ }
+
+ /**
+ * Test full stack serialization for a TransactionManager migrating from DefaultSnapshotCodec to SnapshotCodecV3.
+ */
+ @Test
+ public void testDefaultToV3Migration() throws Exception {
+ File testDir = tmpDir.newFolder("testDefaultToV3Migration");
+ Configuration conf = new Configuration();
+ conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, DefaultSnapshotCodec.class.getName());
+ conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_LOCAL_DIR, testDir.getAbsolutePath());
+
+ Injector injector = Guice.createInjector(new ConfigModule(conf),
+ new DiscoveryModules().getSingleNodeModules(), new TransactionModules().getSingleNodeModules());
+
+ TransactionManager txManager = injector.getInstance(TransactionManager.class);
+ txManager.startAndWait();
+
+ txManager.startLong();
+
+ // shutdown to force a snapshot
+ txManager.stopAndWait();
+
+ TransactionStateStorage txStorage = injector.getInstance(TransactionStateStorage.class);
+ txStorage.startAndWait();
+
+ // confirm that the in-progress entry is missing a type
+ TransactionSnapshot snapshot = txStorage.getLatestSnapshot();
+ TransactionVisibilityState txVisibilityState = txStorage.getLatestTransactionVisibilityState();
+ assertTransactionVisibilityStateEquals(snapshot, txVisibilityState);
+ assertNotNull(snapshot);
+ assertEquals(1, snapshot.getInProgress().size());
+ Map.Entry<Long, TransactionManager.InProgressTx> entry =
+ snapshot.getInProgress().entrySet().iterator().next();
+ assertNull(entry.getValue().getType());
+ txStorage.stopAndWait();
+
+
+ // start a new Tx manager to test fixup
+ Configuration conf2 = new Configuration();
+ conf2.set(TxConstants.Manager.CFG_TX_SNAPSHOT_LOCAL_DIR, testDir.getAbsolutePath());
+ conf2.setStrings(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES,
+ DefaultSnapshotCodec.class.getName(), SnapshotCodecV3.class.getName());
+ Injector injector2 = Guice.createInjector(new ConfigModule(conf2),
+ new DiscoveryModules().getSingleNodeModules(), new TransactionModules().getSingleNodeModules());
+
+ TransactionManager txManager2 = injector2.getInstance(TransactionManager.class);
+ txManager2.startAndWait();
+
+ // state should be recovered
+ TransactionSnapshot snapshot2 = txManager2.getCurrentState();
+ assertEquals(1, snapshot2.getInProgress().size());
+ Map.Entry<Long, TransactionManager.InProgressTx> inProgressTx =
+ snapshot2.getInProgress().entrySet().iterator().next();
+ assertEquals(TransactionType.LONG, inProgressTx.getValue().getType());
+
+ // save a new snapshot
+ txManager2.stopAndWait();
+
+ TransactionStateStorage txStorage2 = injector2.getInstance(TransactionStateStorage.class);
+ txStorage2.startAndWait();
+
+ TransactionSnapshot snapshot3 = txStorage2.getLatestSnapshot();
+ // full snapshot should have deserialized correctly without any fixups
+ assertEquals(snapshot2.getInProgress(), snapshot3.getInProgress());
+ assertEquals(snapshot2, snapshot3);
+ txStorage2.stopAndWait();
+ }
+
+ @Test
+ public void testSnapshotCodecProviderConfiguration() throws Exception {
+ Configuration conf = new Configuration(false);
+ StringBuilder buf = new StringBuilder();
+ for (Class c : TxConstants.Persist.DEFAULT_TX_SNAPHOT_CODEC_CLASSES) {
+ if (buf.length() > 0) {
+ buf.append(",\n ");
+ }
+ buf.append(c.getName());
+ }
+ conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, buf.toString());
+
+ SnapshotCodecProvider codecProvider = new SnapshotCodecProvider(conf);
+ SnapshotCodec v1codec = codecProvider.getCodecForVersion(new DefaultSnapshotCodec().getVersion());
+ assertNotNull(v1codec);
+ assertTrue(v1codec instanceof DefaultSnapshotCodec);
+
+ SnapshotCodec v2codec = codecProvider.getCodecForVersion(new SnapshotCodecV2().getVersion());
+ assertNotNull(v2codec);
+ assertTrue(v2codec instanceof SnapshotCodecV2);
+
+ SnapshotCodec v3codec = codecProvider.getCodecForVersion(new SnapshotCodecV3().getVersion());
+ assertNotNull(v3codec);
+ assertTrue(v3codec instanceof SnapshotCodecV3);
+
+ SnapshotCodec v4codec = codecProvider.getCodecForVersion(new SnapshotCodecV4().getVersion());
+ assertNotNull(v4codec);
+ assertTrue(v4codec instanceof SnapshotCodecV4);
+ }
+
+ @Test
+ public void testSnapshotCodecV4() throws IOException, TransactionNotInProgressException {
+ File testDir = tmpDir.newFolder("testSnapshotCodecV4");
+ Configuration conf = new Configuration();
+ conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, SnapshotCodecV4.class.getName());
+ conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_LOCAL_DIR, testDir.getAbsolutePath());
+
+ Injector injector = Guice.createInjector(new ConfigModule(conf),
+ new DiscoveryModules().getSingleNodeModules(),
+ new TransactionModules().getSingleNodeModules());
+
+ TransactionManager txManager = injector.getInstance(TransactionManager.class);
+ txManager.startAndWait();
+
+ // Create a transaction and a checkpoint transaction
+ Transaction transaction = txManager.startLong();
+ Transaction checkpointTx = txManager.checkpoint(transaction);
+
+ // shutdown to force a snapshot
+ txManager.stopAndWait();
+
+ // Validate the snapshot on disk
+ TransactionStateStorage txStorage = injector.getInstance(TransactionStateStorage.class);
+ txStorage.startAndWait();
+
+ TransactionSnapshot snapshot = txStorage.getLatestSnapshot();
+ TransactionVisibilityState txVisibilityState = txStorage.getLatestTransactionVisibilityState();
+ assertTransactionVisibilityStateEquals(snapshot, txVisibilityState);
+
+ Map<Long, TransactionManager.InProgressTx> inProgress = snapshot.getInProgress();
+ Assert.assertEquals(1, inProgress.size());
+
+ TransactionManager.InProgressTx inProgressTx = inProgress.get(transaction.getTransactionId());
+ Assert.assertNotNull(inProgressTx);
+ Assert.assertArrayEquals(checkpointTx.getCheckpointWritePointers(),
+ inProgressTx.getCheckpointWritePointers().toLongArray());
+
+ txStorage.stopAndWait();
+
+ // start a new Tx manager to see if the transaction is restored correctly.
+ Injector injector2 = Guice.createInjector(new ConfigModule(conf),
+ new DiscoveryModules().getSingleNodeModules(),
+ new TransactionModules().getSingleNodeModules());
+
+ txManager = injector2.getInstance(TransactionManager.class);
+ txManager.startAndWait();
+
+ // state should be recovered
+ snapshot = txManager.getCurrentState();
+ inProgress = snapshot.getInProgress();
+ Assert.assertEquals(1, inProgress.size());
+
+ inProgressTx = inProgress.get(transaction.getTransactionId());
+ Assert.assertNotNull(inProgressTx);
+ Assert.assertArrayEquals(checkpointTx.getCheckpointWritePointers(),
+ inProgressTx.getCheckpointWritePointers().toLongArray());
+
+ // Should be able to commit the transaction
+ Assert.assertTrue(txManager.canCommit(checkpointTx, Collections.<byte[]>emptyList()));
+ Assert.assertTrue(txManager.commit(checkpointTx));
+
+ // save a new snapshot
+ txManager.stopAndWait();
+
+ TransactionStateStorage txStorage2 = injector2.getInstance(TransactionStateStorage.class);
+ txStorage2.startAndWait();
+
+ snapshot = txStorage2.getLatestSnapshot();
+ Assert.assertTrue(snapshot.getInProgress().isEmpty());
+ txStorage2.stopAndWait();
+ }
+
+ private void assertTransactionVisibilityStateEquals(TransactionVisibilityState expected,
+ TransactionVisibilityState input) {
+ Assert.assertEquals(expected.getTimestamp(), input.getTimestamp());
+ Assert.assertEquals(expected.getReadPointer(), input.getReadPointer());
+ Assert.assertEquals(expected.getWritePointer(), input.getWritePointer());
+ Assert.assertEquals(expected.getInProgress(), input.getInProgress());
+ Assert.assertEquals(expected.getInvalid(), input.getInvalid());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/util/AbstractConfigurationProviderTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/util/AbstractConfigurationProviderTest.java b/tephra-core/src/test/java/org/apache/tephra/util/AbstractConfigurationProviderTest.java
new file mode 100644
index 0000000..8526b75
--- /dev/null
+++ b/tephra-core/src/test/java/org/apache/tephra/util/AbstractConfigurationProviderTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ *
+ */
+public abstract class AbstractConfigurationProviderTest {
+ @Test
+ public void testVersionFactory() {
+ HBaseVersion.Version foundVersion = HBaseVersion.get();
+ assertEquals(getExpectedVersion(), foundVersion);
+ }
+
+ protected abstract HBaseVersion.Version getExpectedVersion();
+
+ @Test
+ public void testConfigurationProvider() {
+ Configuration conf = new Configuration();
+ conf.set("foo", "bar");
+ Configuration newConf = new ConfigurationFactory().get(conf);
+ assertNotNull(newConf);
+ assertEquals("bar", newConf.get("foo"));
+ }
+}