You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by po...@apache.org on 2016/05/11 20:15:26 UTC

[26/56] [abbrv] [partial] incubator-tephra git commit: Rename package to org.apache.tephra

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/persist/AbstractTransactionStateStorageTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/persist/AbstractTransactionStateStorageTest.java b/tephra-core/src/test/java/org/apache/tephra/persist/AbstractTransactionStateStorageTest.java
new file mode 100644
index 0000000..628bced
--- /dev/null
+++ b/tephra-core/src/test/java/org/apache/tephra/persist/AbstractTransactionStateStorageTest.java
@@ -0,0 +1,555 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.persist;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tephra.ChangeId;
+import org.apache.tephra.Transaction;
+import org.apache.tephra.TransactionManager;
+import org.apache.tephra.TransactionType;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.metrics.TxMetricsCollector;
+import org.apache.tephra.util.TransactionEditUtil;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Commons tests to run against the {@link TransactionStateStorage} implementations.
+ */
+public abstract class AbstractTransactionStateStorageTest {
+  private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionStateStorageTest.class);
+  private static Random random = new Random();
+
+  protected abstract Configuration getConfiguration(String testName) throws IOException;
+
+  protected abstract AbstractTransactionStateStorage getStorage(Configuration conf);
+
+  @Test
+  public void testSnapshotPersistence() throws Exception {
+    Configuration conf = getConfiguration("testSnapshotPersistence");
+
+    TransactionSnapshot snapshot = createRandomSnapshot();
+    TransactionStateStorage storage = getStorage(conf);
+    try {
+      storage.startAndWait();
+      storage.writeSnapshot(snapshot);
+
+      TransactionSnapshot readSnapshot = storage.getLatestSnapshot();
+      assertNotNull(readSnapshot);
+      assertEquals(snapshot, readSnapshot);
+    } finally {
+      storage.stopAndWait();
+    }
+  }
+
+  @Test
+  public void testLogWriteAndRead() throws Exception {
+    Configuration conf = getConfiguration("testLogWriteAndRead");
+
+    // create some random entries
+    List<TransactionEdit> edits = TransactionEditUtil.createRandomEdits(100);
+    TransactionStateStorage storage = getStorage(conf);
+    try {
+      long now = System.currentTimeMillis();
+      storage.startAndWait();
+      TransactionLog log = storage.createLog(now);
+      for (TransactionEdit edit : edits) {
+        log.append(edit);
+      }
+      log.close();
+
+      Collection<TransactionLog> logsToRead = storage.getLogsSince(now);
+      // should only be our one log
+      assertNotNull(logsToRead);
+      assertEquals(1, logsToRead.size());
+      TransactionLogReader logReader = logsToRead.iterator().next().getReader();
+      assertNotNull(logReader);
+
+      List<TransactionEdit> readEdits = Lists.newArrayListWithExpectedSize(edits.size());
+      TransactionEdit nextEdit;
+      while ((nextEdit = logReader.next()) != null) {
+        readEdits.add(nextEdit);
+      }
+      logReader.close();
+      assertEquals(edits.size(), readEdits.size());
+      for (int i = 0; i < edits.size(); i++) {
+        LOG.info("Checking edit " + i);
+        assertEquals(edits.get(i), readEdits.get(i));
+      }
+    } finally {
+      storage.stopAndWait();
+    }
+  }
+
+  @Test
+  public void testTransactionManagerPersistence() throws Exception {
+    Configuration conf = getConfiguration("testTransactionManagerPersistence");
+    conf.setInt(TxConstants.Manager.CFG_TX_CLEANUP_INTERVAL, 0); // no cleanup thread
+    // start snapshot thread, but with long enough interval so we only get snapshots on shutdown
+    conf.setInt(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, 600);
+
+    TransactionStateStorage storage = null;
+    TransactionStateStorage storage2 = null;
+    TransactionStateStorage storage3 = null;
+    try {
+      storage = getStorage(conf);
+      TransactionManager txManager = new TransactionManager
+        (conf, storage, new TxMetricsCollector());
+      txManager.startAndWait();
+
+      // TODO: replace with new persistence tests
+      final byte[] a = { 'a' };
+      final byte[] b = { 'b' };
+      // start a tx1, add a change A and commit
+      Transaction tx1 = txManager.startShort();
+      Assert.assertTrue(txManager.canCommit(tx1, Collections.singleton(a)));
+      Assert.assertTrue(txManager.commit(tx1));
+      // start a tx2 and add a change B
+      Transaction tx2 = txManager.startShort();
+      Assert.assertTrue(txManager.canCommit(tx2, Collections.singleton(b)));
+      // start a tx3
+      Transaction tx3 = txManager.startShort();
+      // restart
+      txManager.stopAndWait();
+      TransactionSnapshot origState = txManager.getCurrentState();
+      LOG.info("Orig state: " + origState);
+
+      Thread.sleep(100);
+      // starts a new tx manager
+      storage2 = getStorage(conf);
+      txManager = new TransactionManager(conf, storage2, new TxMetricsCollector());
+      txManager.startAndWait();
+
+      // check that the reloaded state matches the old
+      TransactionSnapshot newState = txManager.getCurrentState();
+      LOG.info("New state: " + newState);
+      assertEquals(origState, newState);
+
+      // commit tx2
+      Assert.assertTrue(txManager.commit(tx2));
+      // start another transaction, must be greater than tx3
+      Transaction tx4 = txManager.startShort();
+      Assert.assertTrue(tx4.getTransactionId() > tx3.getTransactionId());
+      // tx1 must be visble from tx2, but tx3 and tx4 must not
+      Assert.assertTrue(tx2.isVisible(tx1.getTransactionId()));
+      Assert.assertFalse(tx2.isVisible(tx3.getTransactionId()));
+      Assert.assertFalse(tx2.isVisible(tx4.getTransactionId()));
+      // add same change for tx3
+      Assert.assertFalse(txManager.canCommit(tx3, Collections.singleton(b)));
+      // check visibility with new xaction
+      Transaction tx5 = txManager.startShort();
+      Assert.assertTrue(tx5.isVisible(tx1.getTransactionId()));
+      Assert.assertTrue(tx5.isVisible(tx2.getTransactionId()));
+      Assert.assertFalse(tx5.isVisible(tx3.getTransactionId()));
+      Assert.assertFalse(tx5.isVisible(tx4.getTransactionId()));
+      // can commit tx3?
+      txManager.abort(tx3);
+      txManager.abort(tx4);
+      txManager.abort(tx5);
+      // start new tx and verify its exclude list is empty
+      Transaction tx6 = txManager.startShort();
+      Assert.assertFalse(tx6.hasExcludes());
+      txManager.abort(tx6);
+
+      // now start 5 x claim size transactions
+      Transaction tx = txManager.startShort();
+      for (int i = 1; i < 50; i++) {
+        tx = txManager.startShort();
+      }
+      origState = txManager.getCurrentState();
+
+      Thread.sleep(100);
+      // simulate crash by starting a new tx manager without a stopAndWait
+      storage3 = getStorage(conf);
+      txManager = new TransactionManager(conf, storage3, new TxMetricsCollector());
+      txManager.startAndWait();
+
+      // verify state again matches (this time should include WAL replay)
+      newState = txManager.getCurrentState();
+      assertEquals(origState, newState);
+
+      // get a new transaction and verify it is greater
+      Transaction txAfter = txManager.startShort();
+      Assert.assertTrue(txAfter.getTransactionId() > tx.getTransactionId());
+    } finally {
+      if (storage != null) {
+        storage.stopAndWait();
+      }
+      if (storage2 != null) {
+        storage2.stopAndWait();
+      }
+      if (storage3 != null) {
+        storage3.stopAndWait();
+      }
+    }
+  }
+
+  /**
+   * Tests whether the committed set is advanced properly on WAL replay.
+   */
+  @Test
+  public void testCommittedSetClearing() throws Exception {
+    Configuration conf = getConfiguration("testCommittedSetClearing");
+    conf.setInt(TxConstants.Manager.CFG_TX_CLEANUP_INTERVAL, 0); // no cleanup thread
+    conf.setInt(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, 0); // no periodic snapshots
+
+    TransactionStateStorage storage1 = null;
+    TransactionStateStorage storage2 = null;
+    try {
+      storage1 = getStorage(conf);
+      TransactionManager txManager = new TransactionManager
+        (conf, storage1, new TxMetricsCollector());
+      txManager.startAndWait();
+
+      // TODO: replace with new persistence tests
+      final byte[] a = { 'a' };
+      final byte[] b = { 'b' };
+      // start a tx1, add a change A and commit
+      Transaction tx1 = txManager.startShort();
+      Assert.assertTrue(txManager.canCommit(tx1, Collections.singleton(a)));
+      Assert.assertTrue(txManager.commit(tx1));
+      // start a tx2 and add a change B
+      Transaction tx2 = txManager.startShort();
+      Assert.assertTrue(txManager.canCommit(tx2, Collections.singleton(b)));
+      // start a tx3
+      Transaction tx3 = txManager.startShort();
+      TransactionSnapshot origState = txManager.getCurrentState();
+      LOG.info("Orig state: " + origState);
+
+      // simulate a failure by starting a new tx manager without stopping first
+      storage2 = getStorage(conf);
+      txManager = new TransactionManager(conf, storage2, new TxMetricsCollector());
+      txManager.startAndWait();
+
+      // check that the reloaded state matches the old
+      TransactionSnapshot newState = txManager.getCurrentState();
+      LOG.info("New state: " + newState);
+      assertEquals(origState, newState);
+
+    } finally {
+      if (storage1 != null) {
+        storage1.stopAndWait();
+      }
+      if (storage2 != null) {
+        storage2.stopAndWait();
+      }
+    }
+  }
+
+  /**
+   * Tests removal of old snapshots and old transaction logs.
+   */
+  @Test
+  public void testOldFileRemoval() throws Exception {
+    Configuration conf = getConfiguration("testOldFileRemoval");
+    TransactionStateStorage storage = null;
+    try {
+      storage = getStorage(conf);
+      storage.startAndWait();
+      long now = System.currentTimeMillis();
+      long writePointer = 1;
+      Collection<Long> invalid = Lists.newArrayList();
+      NavigableMap<Long, TransactionManager.InProgressTx> inprogress = Maps.newTreeMap();
+      Map<Long, Set<ChangeId>> committing = Maps.newHashMap();
+      Map<Long, Set<ChangeId>> committed = Maps.newHashMap();
+      TransactionSnapshot snapshot = new TransactionSnapshot(now, 0, writePointer++, invalid,
+                                                             inprogress, committing, committed);
+      TransactionEdit dummyEdit = TransactionEdit.createStarted(1, 0, Long.MAX_VALUE, TransactionType.SHORT);
+
+      // write snapshot 1
+      storage.writeSnapshot(snapshot);
+      TransactionLog log = storage.createLog(now);
+      log.append(dummyEdit);
+      log.close();
+
+      snapshot = new TransactionSnapshot(now + 1, 0, writePointer++, invalid, inprogress, committing, committed);
+      // write snapshot 2
+      storage.writeSnapshot(snapshot);
+      log = storage.createLog(now + 1);
+      log.append(dummyEdit);
+      log.close();
+
+      snapshot = new TransactionSnapshot(now + 2, 0, writePointer++, invalid, inprogress, committing, committed);
+      // write snapshot 3
+      storage.writeSnapshot(snapshot);
+      log = storage.createLog(now + 2);
+      log.append(dummyEdit);
+      log.close();
+
+      snapshot = new TransactionSnapshot(now + 3, 0, writePointer++, invalid, inprogress, committing, committed);
+      // write snapshot 4
+      storage.writeSnapshot(snapshot);
+      log = storage.createLog(now + 3);
+      log.append(dummyEdit);
+      log.close();
+
+      snapshot = new TransactionSnapshot(now + 4, 0, writePointer++, invalid, inprogress, committing, committed);
+      // write snapshot 5
+      storage.writeSnapshot(snapshot);
+      log = storage.createLog(now + 4);
+      log.append(dummyEdit);
+      log.close();
+
+      snapshot = new TransactionSnapshot(now + 5, 0, writePointer++, invalid, inprogress, committing, committed);
+      // write snapshot 6
+      storage.writeSnapshot(snapshot);
+      log = storage.createLog(now + 5);
+      log.append(dummyEdit);
+      log.close();
+
+      List<String> allSnapshots = storage.listSnapshots();
+      LOG.info("All snapshots: " + allSnapshots);
+      assertEquals(6, allSnapshots.size());
+      List<String> allLogs = storage.listLogs();
+      LOG.info("All logs: " + allLogs);
+      assertEquals(6, allLogs.size());
+
+      long oldestKept = storage.deleteOldSnapshots(3);
+      assertEquals(now + 3, oldestKept);
+      allSnapshots = storage.listSnapshots();
+      LOG.info("All snapshots: " + allSnapshots);
+      assertEquals(3, allSnapshots.size());
+
+      storage.deleteLogsOlderThan(oldestKept);
+      allLogs = storage.listLogs();
+      LOG.info("All logs: " + allLogs);
+      assertEquals(3, allLogs.size());
+    } finally {
+      if (storage != null) {
+        storage.stopAndWait();
+      }
+    }
+  }
+  
+  @Test
+  public void testLongTxnEditReplay() throws Exception {
+    Configuration conf = getConfiguration("testLongTxnEditReplay");
+    TransactionStateStorage storage = null;
+    try {
+      storage = getStorage(conf);
+      storage.startAndWait();
+
+      // Create long running txns. Abort one of them, invalidate another, invalidate and abort the last.
+      long time1 = System.currentTimeMillis();
+      long wp1 = time1 * TxConstants.MAX_TX_PER_MS;
+      TransactionEdit edit1 = TransactionEdit.createStarted(wp1, wp1 - 10, time1 + 100000, TransactionType.LONG);
+      TransactionEdit edit2 = TransactionEdit.createAborted(wp1, TransactionType.LONG, null);
+
+      long time2 = time1 + 100;
+      long wp2 = time2 * TxConstants.MAX_TX_PER_MS;
+      TransactionEdit edit3 = TransactionEdit.createStarted(wp2, wp2 - 10, time2 + 100000, TransactionType.LONG);
+      TransactionEdit edit4 = TransactionEdit.createInvalid(wp2);
+
+      long time3 = time1 + 200;
+      long wp3 = time3 * TxConstants.MAX_TX_PER_MS;
+      TransactionEdit edit5 = TransactionEdit.createStarted(wp3, wp3 - 10, time3 + 100000, TransactionType.LONG);
+      TransactionEdit edit6 = TransactionEdit.createInvalid(wp3);
+      TransactionEdit edit7 = TransactionEdit.createAborted(wp3, TransactionType.LONG, null);
+      
+      // write transaction edits
+      TransactionLog log = storage.createLog(time1);
+      log.append(edit1);
+      log.append(edit2);
+      log.append(edit3);
+      log.append(edit4);
+      log.append(edit5);
+      log.append(edit6);
+      log.append(edit7);
+      log.close();
+
+      // Start transaction manager
+      TransactionManager txm = new TransactionManager(conf, storage, new TxMetricsCollector());
+      txm.startAndWait();
+      try {
+        // Verify that all txns are in invalid list.
+        TransactionSnapshot snapshot1 = txm.getCurrentState();
+        assertEquals(ImmutableList.of(wp1, wp2, wp3), snapshot1.getInvalid());
+        assertEquals(0, snapshot1.getInProgress().size());
+        assertEquals(0, snapshot1.getCommittedChangeSets().size());
+        assertEquals(0, snapshot1.getCommittedChangeSets().size());
+      } finally {
+        txm.stopAndWait();
+      }
+    } finally {
+      if (storage != null) {
+        storage.stopAndWait();
+      }
+    }
+  }
+
+  @Test
+  public void testTruncateInvalidTxEditReplay() throws Exception {
+    Configuration conf = getConfiguration("testTruncateInvalidTxEditReplay");
+    TransactionStateStorage storage = null;
+    try {
+      storage = getStorage(conf);
+      storage.startAndWait();
+
+      // Create some txns, and invalidate all of them.
+      long time1 = System.currentTimeMillis();
+      long wp1 = time1 * TxConstants.MAX_TX_PER_MS;
+      TransactionEdit edit1 = TransactionEdit.createStarted(wp1, wp1 - 10, time1 + 100000, TransactionType.LONG);
+      TransactionEdit edit2 = TransactionEdit.createInvalid(wp1);
+
+      long time2 = time1 + 100;
+      long wp2 = time2 * TxConstants.MAX_TX_PER_MS;
+      TransactionEdit edit3 = TransactionEdit.createStarted(wp2, wp2 - 10, time2 + 10000, TransactionType.SHORT);
+      TransactionEdit edit4 = TransactionEdit.createInvalid(wp2);
+
+      long time3 = time1 + 2000;
+      long wp3 = time3 * TxConstants.MAX_TX_PER_MS;
+      TransactionEdit edit5 = TransactionEdit.createStarted(wp3, wp3 - 10, time3 + 100000, TransactionType.LONG);
+      TransactionEdit edit6 = TransactionEdit.createInvalid(wp3);
+
+      long time4 = time1 + 2100;
+      long wp4 = time4 * TxConstants.MAX_TX_PER_MS;
+      TransactionEdit edit7 = TransactionEdit.createStarted(wp4, wp4 - 10, time4 + 10000, TransactionType.SHORT);
+      TransactionEdit edit8 = TransactionEdit.createInvalid(wp4);
+
+      // remove wp1 and wp3 from invalid list
+      TransactionEdit edit9 = TransactionEdit.createTruncateInvalidTx(ImmutableSet.of(wp1, wp3));
+      // truncate invalid transactions before time3
+      TransactionEdit edit10 = TransactionEdit.createTruncateInvalidTxBefore(time3);
+
+      // write transaction edits
+      TransactionLog log = storage.createLog(time1);
+      log.append(edit1);
+      log.append(edit2);
+      log.append(edit3);
+      log.append(edit4);
+      log.append(edit5);
+      log.append(edit6);
+      log.append(edit7);
+      log.append(edit8);
+      log.append(edit9);
+      log.append(edit10);
+      log.close();
+
+      // Start transaction manager
+      TransactionManager txm = new TransactionManager(conf, storage, new TxMetricsCollector());
+      txm.startAndWait();
+      try {
+        // Only wp4 should be in invalid list.
+        TransactionSnapshot snapshot = txm.getCurrentState();
+        assertEquals(ImmutableList.of(wp4), snapshot.getInvalid());
+        assertEquals(0, snapshot.getInProgress().size());
+        assertEquals(0, snapshot.getCommittedChangeSets().size());
+        assertEquals(0, snapshot.getCommittedChangeSets().size());
+      } finally {
+        txm.stopAndWait();
+      }
+    } finally {
+      if (storage != null) {
+        storage.stopAndWait();
+      }
+    }
+  }
+
+  /**
+   * Generates a new snapshot object with semi-randomly populated values.  This does not necessarily accurately
+   * represent a typical snapshot's distribution of values, as we only set an upper bound on pointer values.
+   *
+   * We generate a new snapshot with the contents:
+   * <ul>
+   *   <li>readPointer = 1M + (random % 1M)</li>
+   *   <li>writePointer = readPointer + 1000</li>
+   *   <li>waterMark = writePointer + 1000</li>
+   *   <li>inProgress = one each for (writePointer - 500)..writePointer, ~ 5% "long" transaction</li>
+   *   <li>invalid = 100 randomly distributed, 0..1M</li>
+   *   <li>committing = one each, (readPointer + 1)..(readPointer + 100)</li>
+   *   <li>committed = one each, (readPointer - 1000)..readPointer</li>
+   * </ul>
+   * @return a new snapshot of transaction state.
+   */
+  private TransactionSnapshot createRandomSnapshot() {
+    // limit readPointer to a reasonable range, but make it > 1M so we can assign enough keys below
+    long readPointer = (Math.abs(random.nextLong()) % 1000000L) + 1000000L;
+    long writePointer = readPointer + 1000L;
+
+    // generate in progress -- assume last 500 write pointer values
+    NavigableMap<Long, TransactionManager.InProgressTx> inProgress = Maps.newTreeMap();
+    long startPointer = writePointer - 500L;
+    for (int i = 0; i < 500; i++) {
+      long currentTime = System.currentTimeMillis();
+      // make some "long" transactions
+      if (i % 20 == 0) {
+        inProgress.put(startPointer + i,
+                       new TransactionManager.InProgressTx(startPointer - 1, currentTime + TimeUnit.DAYS.toSeconds(1),
+                                                           TransactionType.LONG));
+      } else {
+        inProgress.put(startPointer + i,
+                       new TransactionManager.InProgressTx(startPointer - 1, currentTime + 300000L, 
+                                                           TransactionType.SHORT));
+      }
+    }
+
+    // make 100 random invalid IDs
+    LongArrayList invalid = new LongArrayList();
+    for (int i = 0; i < 100; i++) {
+      invalid.add(Math.abs(random.nextLong()) % 1000000L);
+    }
+
+    // make 100 committing entries, 10 keys each
+    Map<Long, Set<ChangeId>> committing = Maps.newHashMap();
+    for (int i = 0; i < 100; i++) {
+      committing.put(readPointer + i, generateChangeSet(10));
+    }
+
+    // make 1000 committed entries, 10 keys each
+    long startCommitted = readPointer - 1000L;
+    NavigableMap<Long, Set<ChangeId>> committed = Maps.newTreeMap();
+    for (int i = 0; i < 1000; i++) {
+      committed.put(startCommitted + i, generateChangeSet(10));
+    }
+
+    return new TransactionSnapshot(System.currentTimeMillis(), readPointer, writePointer,
+                                   invalid, inProgress, committing, committed);
+  }
+
+  private Set<ChangeId> generateChangeSet(int numEntries) {
+    Set<ChangeId> changes = Sets.newHashSet();
+    for (int i = 0; i < numEntries; i++) {
+      byte[] bytes = new byte[8];
+      random.nextBytes(bytes);
+      changes.add(new ChangeId(bytes));
+    }
+    return changes;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/persist/CommitMarkerCodecTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/persist/CommitMarkerCodecTest.java b/tephra-core/src/test/java/org/apache/tephra/persist/CommitMarkerCodecTest.java
new file mode 100644
index 0000000..22cdb59
--- /dev/null
+++ b/tephra-core/src/test/java/org/apache/tephra/persist/CommitMarkerCodecTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.persist;
+
+import com.google.common.primitives.Ints;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.tephra.TxConstants;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * Unit Test for {@link CommitMarkerCodec}.
+ */
+public class CommitMarkerCodecTest {
+
+  @ClassRule
+  public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
+  private static final String LOG_FILE = "txlog";
+  private static final Random RANDOM = new Random();
+
+  private static MiniDFSCluster dfsCluster;
+  private static Configuration conf;
+  private static FileSystem fs;
+
+  @BeforeClass
+  public static void setupBeforeClass() throws Exception {
+    Configuration hConf = new Configuration();
+    hConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TMP_FOLDER.newFolder().getAbsolutePath());
+
+    dfsCluster = new MiniDFSCluster.Builder(hConf).numDataNodes(1).build();
+    conf = new Configuration(dfsCluster.getFileSystem().getConf());
+    fs = FileSystem.newInstance(FileSystem.getDefaultUri(conf), conf);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    dfsCluster.shutdown();
+  }
+
+  @Test
+  public void testRandomCommitMarkers() throws Exception {
+    List<Integer> randomInts = new ArrayList<>();
+    Path newLog = new Path(TMP_FOLDER.newFolder().getAbsolutePath(), LOG_FILE);
+
+    // Write a bunch of random commit markers
+    try (SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, newLog, LongWritable.class,
+                                                                LongWritable.class,
+                                                                SequenceFile.CompressionType.NONE)) {
+      for (int i = 0; i < 1000; i++) {
+        int randomNum = RANDOM.nextInt(Integer.MAX_VALUE);
+        CommitMarkerCodec.writeMarker(writer, randomNum);
+        randomInts.add(randomNum);
+      }
+      writer.hflush();
+      writer.hsync();
+    }
+
+    // Read the commit markers back to verify the marker
+    try (SequenceFile.Reader reader = new SequenceFile.Reader(fs, newLog, conf);
+         CommitMarkerCodec markerCodec = new CommitMarkerCodec()) {
+      for (int num : randomInts) {
+        Assert.assertEquals(num, markerCodec.readMarker(reader));
+      }
+    }
+  }
+
+  private static class IncompleteValueBytes implements SequenceFile.ValueBytes {
+
+    @Override
+    public void writeUncompressedBytes(DataOutputStream outStream) throws IOException {
+      // don't write anything to simulate incomplete record
+    }
+
+    @Override
+    public void writeCompressedBytes(DataOutputStream outStream) throws IllegalArgumentException, IOException {
+      throw new IllegalArgumentException("Not possible");
+    }
+
+    @Override
+    public int getSize() {
+      return Ints.BYTES;
+    }
+  }
+
+  @Test
+  public void testIncompleteCommitMarker() throws Exception {
+    Path newLog = new Path(TMP_FOLDER.newFolder().getAbsolutePath(), LOG_FILE);
+    try (SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, newLog, LongWritable.class,
+                                                                LongWritable.class,
+                                                                SequenceFile.CompressionType.NONE)) {
+      String key = TxConstants.TransactionLog.NUM_ENTRIES_APPENDED;
+      SequenceFile.ValueBytes valueBytes = new IncompleteValueBytes();
+      writer.appendRaw(key.getBytes(), 0, key.length(), valueBytes);
+      writer.hflush();
+      writer.hsync();
+    }
+
+    // Read the incomplete commit marker
+    try (SequenceFile.Reader reader = new SequenceFile.Reader(fs, newLog, conf);
+         CommitMarkerCodec markerCodec = new CommitMarkerCodec()) {
+      try {
+        markerCodec.readMarker(reader);
+        Assert.fail("Expected EOF Exception to be thrown");
+      } catch (EOFException e) {
+        // expected since we didn't write the value bytes
+      }
+    }
+  }
+
+  @Test
+  public void testIncorrectCommitMarker() throws Exception {
+    Path newLog = new Path(TMP_FOLDER.newFolder().getAbsolutePath(), LOG_FILE);
+
+    // Write an incorrect marker
+    try (SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, newLog, LongWritable.class,
+                                                                LongWritable.class,
+                                                                SequenceFile.CompressionType.NONE)) {
+      String invalidKey = "IncorrectKey";
+      SequenceFile.ValueBytes valueBytes = new CommitMarkerCodec.CommitEntriesCount(100);
+      writer.appendRaw(invalidKey.getBytes(), 0, invalidKey.length(), valueBytes);
+      writer.hflush();
+      writer.hsync();
+    }
+
+    // Read the commit markers back to verify the marker
+    try (SequenceFile.Reader reader = new SequenceFile.Reader(fs, newLog, conf);
+         CommitMarkerCodec markerCodec = new CommitMarkerCodec()) {
+      try {
+        markerCodec.readMarker(reader);
+        Assert.fail("Expected an IOException to be thrown");
+      } catch (IOException e) {
+        // expected
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/persist/HDFSTransactionLogTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/persist/HDFSTransactionLogTest.java b/tephra-core/src/test/java/org/apache/tephra/persist/HDFSTransactionLogTest.java
new file mode 100644
index 0000000..7b9f06b
--- /dev/null
+++ b/tephra-core/src/test/java/org/apache/tephra/persist/HDFSTransactionLogTest.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.persist;
+
+import com.google.common.io.Closeables;
+import com.google.common.primitives.Longs;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.metrics.MetricsCollector;
+import org.apache.tephra.metrics.TxMetricsCollector;
+import org.apache.tephra.util.TransactionEditUtil;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+/**
+ * Testing for complete and partial sycs of {@link TransactionEdit} to {@link HDFSTransactionLog}
+ */
+public class HDFSTransactionLogTest {
+  @ClassRule
+  public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
+  private static final String LOG_FILE_PREFIX = "txlog.";
+
+  private static MiniDFSCluster dfsCluster;
+  private static Configuration conf;
+  private static MetricsCollector metricsCollector;
+
+  @BeforeClass
+  public static void setupBeforeClass() throws Exception {
+    Configuration hConf = new Configuration();
+    hConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TMP_FOLDER.newFolder().getAbsolutePath());
+
+    dfsCluster = new MiniDFSCluster.Builder(hConf).numDataNodes(1).build();
+    conf = new Configuration(dfsCluster.getFileSystem().getConf());
+    metricsCollector = new TxMetricsCollector();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    dfsCluster.shutdown();
+  }
+
+  private Configuration getConfiguration() throws IOException {
+    // tests should use the current user for HDFS
+    conf.unset(TxConstants.Manager.CFG_TX_HDFS_USER);
+    conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, TMP_FOLDER.newFolder().getAbsolutePath());
+    return conf;
+  }
+
+  private HDFSTransactionLog getHDFSTransactionLog(Configuration conf,
+                                                   FileSystem fs, long timeInMillis) throws Exception {
+    String snapshotDir = conf.get(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR);
+    Path newLog = new Path(snapshotDir, LOG_FILE_PREFIX + timeInMillis);
+    return new HDFSTransactionLog(fs, conf, newLog, timeInMillis, metricsCollector);
+  }
+
+  private SequenceFile.Writer getSequenceFileWriter(Configuration configuration, FileSystem fs,
+                                                    long timeInMillis, boolean withMarker) throws IOException {
+    String snapshotDir = configuration.get(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR);
+    Path newLog = new Path(snapshotDir, LOG_FILE_PREFIX + timeInMillis);
+    SequenceFile.Metadata metadata = new SequenceFile.Metadata();
+    if (withMarker) {
+      metadata.set(new Text(TxConstants.TransactionLog.VERSION_KEY),
+                   new Text(Byte.toString(TxConstants.TransactionLog.CURRENT_VERSION)));
+    }
+    return SequenceFile.createWriter(fs, configuration, newLog, LongWritable.class,
+                                     TransactionEdit.class, SequenceFile.CompressionType.NONE, null, null, metadata);
+  }
+
+  private void writeNumWrites(SequenceFile.Writer writer, final int size) throws Exception {
+    String key = TxConstants.TransactionLog.NUM_ENTRIES_APPENDED;
+    CommitMarkerCodec.writeMarker(writer, size);
+  }
+
+  private void testTransactionLogSync(int totalCount, int batchSize, boolean withMarker, boolean isComplete)
+    throws Exception {
+    List<TransactionEdit> edits = TransactionEditUtil.createRandomEdits(totalCount);
+    long timestamp = System.currentTimeMillis();
+    Configuration configuration = getConfiguration();
+    FileSystem fs = FileSystem.newInstance(FileSystem.getDefaultUri(configuration), configuration);
+    SequenceFile.Writer writer = getSequenceFileWriter(configuration, fs, timestamp, withMarker);
+    AtomicLong logSequence = new AtomicLong();
+    HDFSTransactionLog transactionLog = getHDFSTransactionLog(configuration, fs, timestamp);
+    AbstractTransactionLog.Entry entry;
+
+    for (int i = 0; i < totalCount - batchSize; i += batchSize) {
+      if (withMarker) {
+        writeNumWrites(writer, batchSize);
+      }
+      for (int j = 0; j < batchSize; j++) {
+        entry = new AbstractTransactionLog.Entry(new LongWritable(logSequence.getAndIncrement()), edits.get(j));
+        writer.append(entry.getKey(), entry.getEdit());
+      }
+      writer.syncFs();
+    }
+
+    if (withMarker) {
+      writeNumWrites(writer, batchSize);
+    }
+
+    for (int i = totalCount - batchSize; i < totalCount - 1; i++) {
+      entry = new AbstractTransactionLog.Entry(new LongWritable(logSequence.getAndIncrement()), edits.get(i));
+      writer.append(entry.getKey(), entry.getEdit());
+    }
+
+    entry = new AbstractTransactionLog.Entry(new LongWritable(logSequence.getAndIncrement()),
+                                             edits.get(totalCount - 1));
+    if (isComplete) {
+      writer.append(entry.getKey(), entry.getEdit());
+    } else {
+      byte[] bytes = Longs.toByteArray(entry.getKey().get());
+      writer.appendRaw(bytes, 0, bytes.length, new SequenceFile.ValueBytes() {
+        @Override
+        public void writeUncompressedBytes(DataOutputStream outStream) throws IOException {
+          byte[] test = new byte[]{0x2};
+          outStream.write(test, 0, 1);
+        }
+
+        @Override
+        public void writeCompressedBytes(DataOutputStream outStream) throws IllegalArgumentException, IOException {
+          // no-op
+        }
+
+        @Override
+        public int getSize() {
+          // mimic size longer than the actual byte array size written, so we would reach EOF
+          return 12;
+        }
+      });
+    }
+    writer.syncFs();
+    Closeables.closeQuietly(writer);
+
+    // now let's try to read this log
+    TransactionLogReader reader = transactionLog.getReader();
+    int syncedEdits = 0;
+    while (reader.next() != null) {
+      // testing reading the transaction edits
+      syncedEdits++;
+    }
+    if (isComplete) {
+      Assert.assertEquals(totalCount, syncedEdits);
+    } else {
+      Assert.assertEquals(totalCount - batchSize, syncedEdits);
+    }
+  }
+
+  @Test
+  public void testTransactionLogNewVersion() throws Exception {
+    // in-complete sync
+    testTransactionLogSync(1000, 1, true, false);
+    testTransactionLogSync(2000, 5, true, false);
+
+    // complete sync
+    testTransactionLogSync(1000, 1, true, true);
+    testTransactionLogSync(2000, 5, true, true);
+  }
+
+  @Test
+  public void testTransactionLogOldVersion() throws Exception {
+    // in-complete sync
+    testTransactionLogSync(1000, 1, false, false);
+
+    // complete sync
+    testTransactionLogSync(2000, 5, false, true);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/persist/HDFSTransactionStateStorageTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/persist/HDFSTransactionStateStorageTest.java b/tephra-core/src/test/java/org/apache/tephra/persist/HDFSTransactionStateStorageTest.java
new file mode 100644
index 0000000..f0eb9e5
--- /dev/null
+++ b/tephra-core/src/test/java/org/apache/tephra/persist/HDFSTransactionStateStorageTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.persist;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.metrics.TxMetricsCollector;
+import org.apache.tephra.snapshot.SnapshotCodecProvider;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+
+
+/**
+ * Tests persistence of transaction snapshots and write-ahead logs to HDFS storage, using the
+ * {@link HDFSTransactionStateStorage} and {@link HDFSTransactionLog} implementations.
+ */
+public class HDFSTransactionStateStorageTest extends AbstractTransactionStateStorageTest {
+
+  @ClassRule
+  public static TemporaryFolder tmpFolder = new TemporaryFolder();
+
+  private static MiniDFSCluster dfsCluster;
+  private static Configuration conf;
+
+  @BeforeClass
+  public static void setupBeforeClass() throws Exception {
+    Configuration hConf = new Configuration();
+    hConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tmpFolder.newFolder().getAbsolutePath());
+
+    dfsCluster = new MiniDFSCluster.Builder(hConf).numDataNodes(1).build();
+    conf = new Configuration(dfsCluster.getFileSystem().getConf());
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    dfsCluster.shutdown();
+  }
+
+  @Override
+  protected Configuration getConfiguration(String testName) throws IOException {
+    // tests should use the current user for HDFS
+    conf.unset(TxConstants.Manager.CFG_TX_HDFS_USER);
+    conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath());
+    return conf;
+  }
+
+  @Override
+  protected AbstractTransactionStateStorage getStorage(Configuration conf) {
+    return new HDFSTransactionStateStorage(conf, new SnapshotCodecProvider(conf), new TxMetricsCollector());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/persist/InMemoryTransactionStateStorage.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/persist/InMemoryTransactionStateStorage.java b/tephra-core/src/test/java/org/apache/tephra/persist/InMemoryTransactionStateStorage.java
new file mode 100644
index 0000000..366542b
--- /dev/null
+++ b/tephra-core/src/test/java/org/apache/tephra/persist/InMemoryTransactionStateStorage.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.persist;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.AbstractIdleService;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import javax.annotation.Nullable;
+
+/**
+ * Stores the latest transaction snapshot and logs in memory.
+ */
+public class InMemoryTransactionStateStorage extends AbstractIdleService implements TransactionStateStorage {
+  // only keeps the most recent snapshot in memory
+  private TransactionSnapshot lastSnapshot;
+
+  private NavigableMap<Long, TransactionLog> logs = new TreeMap<>();
+
+  @Override
+  protected void startUp() throws Exception {
+  }
+
+  @Override
+  protected void shutDown() throws Exception {
+    lastSnapshot = null;
+    logs = new TreeMap<>();
+  }
+
+  @Override
+  public void writeSnapshot(OutputStream out, TransactionSnapshot snapshot) throws IOException {
+    // no codecs in in-memory mode
+  }
+
+  @Override
+  public void writeSnapshot(TransactionSnapshot snapshot) throws IOException {
+    lastSnapshot = snapshot;
+  }
+
+  @Override
+  public TransactionSnapshot getLatestSnapshot() throws IOException {
+    return lastSnapshot;
+  }
+
+  @Override
+  public TransactionVisibilityState getLatestTransactionVisibilityState() throws IOException {
+    return lastSnapshot;
+  }
+
+  @Override
+  public long deleteOldSnapshots(int numberToKeep) throws IOException {
+    // always only keep the last snapshot
+    return lastSnapshot.getTimestamp();
+  }
+
+  @Override
+  public List<String> listSnapshots() throws IOException {
+    List<String> snapshots = Lists.newArrayListWithCapacity(1);
+    if (lastSnapshot != null) {
+      snapshots.add(Long.toString(lastSnapshot.getTimestamp()));
+    }
+    return snapshots;
+  }
+
+  @Override
+  public List<TransactionLog> getLogsSince(long timestamp) throws IOException {
+    return Lists.newArrayList(logs.tailMap(timestamp).values());
+  }
+
+  @Override
+  public TransactionLog createLog(long timestamp) throws IOException {
+    TransactionLog log = new InMemoryTransactionLog(timestamp);
+    logs.put(timestamp, log);
+    return log;
+  }
+
+  @Override
+  public void deleteLogsOlderThan(long timestamp) throws IOException {
+    Iterator<Map.Entry<Long, TransactionLog>> logIter = logs.entrySet().iterator();
+    while (logIter.hasNext()) {
+      Map.Entry<Long, TransactionLog> logEntry = logIter.next();
+      if (logEntry.getKey() < timestamp) {
+        logIter.remove();
+      }
+    }
+  }
+
+  @Override
+  public void setupStorage() throws IOException {
+  }
+
+  @Override
+  public List<String> listLogs() throws IOException {
+    return Lists.transform(Lists.newArrayList(logs.keySet()), new Function<Long, String>() {
+      @Nullable
+      @Override
+      public String apply(@Nullable Long input) {
+        return input.toString();
+      }
+    });
+  }
+
+  @Override
+  public String getLocation() {
+    return "in-memory";
+  }
+
+  public static class InMemoryTransactionLog implements TransactionLog {
+    private long timestamp;
+    private List<TransactionEdit> edits = Lists.newArrayList();
+    boolean isClosed = false;
+    public InMemoryTransactionLog(long timestamp) {
+      this.timestamp = timestamp;
+    }
+
+    @Override
+    public String getName() {
+      return "in-memory@" + timestamp;
+    }
+
+    @Override
+    public long getTimestamp() {
+      return timestamp;
+    }
+
+    @Override
+    public void append(TransactionEdit edit) throws IOException {
+      if (isClosed) {
+        throw new IOException("Log is closed");
+      }
+      edits.add(edit);
+    }
+
+    @Override
+    public void append(List<TransactionEdit> edits) throws IOException {
+      if (isClosed) {
+        throw new IOException("Log is closed");
+      }
+      edits.addAll(edits);
+    }
+
+    @Override
+    public void close() {
+      isClosed = true;
+    }
+
+    @Override
+    public TransactionLogReader getReader() throws IOException {
+      return new InMemoryLogReader(edits.iterator());
+    }
+  }
+
+  public static class InMemoryLogReader implements TransactionLogReader {
+    private final Iterator<TransactionEdit> editIterator;
+
+    public InMemoryLogReader(Iterator<TransactionEdit> editIterator) {
+      this.editIterator = editIterator;
+    }
+
+    @Override
+    public TransactionEdit next() throws IOException {
+      if (editIterator.hasNext()) {
+        return editIterator.next();
+      }
+      return null;
+    }
+
+    @Override
+    public TransactionEdit next(TransactionEdit reuse) throws IOException {
+      return next();
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/persist/LocalTransactionStateStorageTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/persist/LocalTransactionStateStorageTest.java b/tephra-core/src/test/java/org/apache/tephra/persist/LocalTransactionStateStorageTest.java
new file mode 100644
index 0000000..9535102
--- /dev/null
+++ b/tephra-core/src/test/java/org/apache/tephra/persist/LocalTransactionStateStorageTest.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.persist;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSortedSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tephra.ChangeId;
+import org.apache.tephra.TransactionManager;
+import org.apache.tephra.TransactionType;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.metrics.TxMetricsCollector;
+import org.apache.tephra.snapshot.DefaultSnapshotCodec;
+import org.apache.tephra.snapshot.SnapshotCodecProvider;
+import org.apache.tephra.snapshot.SnapshotCodecV4;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.DataOutput;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Runs transaction persistence tests against the {@link LocalFileTransactionStateStorage} and
+ * {@link LocalFileTransactionLog} implementations.
+ */
+public class LocalTransactionStateStorageTest extends AbstractTransactionStateStorageTest {
+  @ClassRule
+  public static TemporaryFolder tmpDir = new TemporaryFolder();
+
+  @Override
+  protected Configuration getConfiguration(String testName) throws IOException {
+    File testDir = tmpDir.newFolder(testName);
+    Configuration conf = new Configuration();
+    conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_LOCAL_DIR, testDir.getAbsolutePath());
+    conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, SnapshotCodecV4.class.getName());
+    return conf;
+  }
+
+  @Override
+  protected AbstractTransactionStateStorage getStorage(Configuration conf) {
+    return new LocalFileTransactionStateStorage(conf, new SnapshotCodecProvider(conf), new TxMetricsCollector());
+  }
+
+  // v2 TransactionEdit
+  @SuppressWarnings("deprecation")
+  private class TransactionEditV2 extends TransactionEdit {
+    public TransactionEditV2(long writePointer, long visibilityUpperBound, State state, long expirationDate,
+                             Set<ChangeId> changes, long commitPointer, boolean canCommit, TransactionType type) {
+      super(writePointer, visibilityUpperBound, state, expirationDate, changes, commitPointer, canCommit, type, 
+            null, 0L, 0L, null);
+    }
+    @Override
+    public void write(DataOutput out) throws IOException {
+      TransactionEditCodecs.encode(this, out, new TransactionEditCodecs.TransactionEditCodecV2());
+    }
+  }
+
+  // Note: this test cannot run in AbstractTransactionStateStorageTest, since SequenceFile throws exception saying
+  // TransactionEditV2 is not TransactionEdit. Since the code path this test is verifying is the same path between
+  // HDFS and Local Storage, having this only over here is fine.
+  @SuppressWarnings("deprecation")
+  @Test
+  public void testLongTxnBackwardsCompatibility() throws Exception {
+    Configuration conf = getConfiguration("testLongTxnBackwardsCompatibility");
+
+    // Use SnapshotCodec version 1
+    String latestSnapshotCodec = conf.get(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES);
+    conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, DefaultSnapshotCodec.class.getName());
+
+    TransactionStateStorage storage = null;
+    try {
+      storage = getStorage(conf);
+      storage.startAndWait();
+
+      // Create transaction snapshot and transaction edits with version when long running txns had -1 expiration.
+      Collection<Long> invalid = Lists.newArrayList();
+      NavigableMap<Long, TransactionManager.InProgressTx> inProgress = Maps.newTreeMap();
+      long time1 = System.currentTimeMillis();
+      long wp1 = time1 * TxConstants.MAX_TX_PER_MS;
+      inProgress.put(wp1, new TransactionManager.InProgressTx(wp1 - 5, -1L));
+      long time2 = time1 + 100;
+      long wp2 = time2 * TxConstants.MAX_TX_PER_MS;
+      inProgress.put(wp2, new TransactionManager.InProgressTx(wp2 - 50, time2 + 1000));
+      Map<Long, Set<ChangeId>> committing = Maps.newHashMap();
+      Map<Long, Set<ChangeId>> committed = Maps.newHashMap();
+      TransactionSnapshot snapshot = new TransactionSnapshot(time2, 0, wp2, invalid,
+                                                             inProgress, committing, committed);
+      long time3 = time1 + 200;
+      long wp3 = time3 * TxConstants.MAX_TX_PER_MS;
+      TransactionEdit edit1 = new TransactionEditV2(wp3, wp3 - 10, TransactionEdit.State.INPROGRESS, -1L,
+                                                    null, 0L, false, null);
+      long time4 = time1 + 300;
+      long wp4 = time4  * TxConstants.MAX_TX_PER_MS;
+      TransactionEdit edit2 = new TransactionEditV2(wp4, wp4 - 10, TransactionEdit.State.INPROGRESS, time4 + 1000,
+                                                    null, 0L, false, null);
+
+      // write snapshot and transaction edit
+      storage.writeSnapshot(snapshot);
+      TransactionLog log = storage.createLog(time2);
+      log.append(edit1);
+      log.append(edit2);
+      log.close();
+
+      // Start transaction manager
+      conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, latestSnapshotCodec);
+      long longTimeout = TimeUnit.SECONDS.toMillis(conf.getLong(TxConstants.Manager.CFG_TX_LONG_TIMEOUT,
+                                                                TxConstants.Manager.DEFAULT_TX_LONG_TIMEOUT));
+      TransactionManager txm = new TransactionManager(conf, storage, new TxMetricsCollector());
+      txm.startAndWait();
+      try {
+        // Verify that the txns in old format were read correctly.
+        // There should be four in-progress transactions, and no invalid transactions
+        TransactionSnapshot snapshot1 = txm.getCurrentState();
+        Assert.assertEquals(ImmutableSortedSet.of(wp1, wp2, wp3, wp4), snapshot1.getInProgress().keySet());
+        verifyInProgress(snapshot1.getInProgress().get(wp1), TransactionType.LONG, time1 + longTimeout);
+        verifyInProgress(snapshot1.getInProgress().get(wp2), TransactionType.SHORT, time2 + 1000);
+        verifyInProgress(snapshot1.getInProgress().get(wp3), TransactionType.LONG, time3 + longTimeout);
+        verifyInProgress(snapshot1.getInProgress().get(wp4), TransactionType.SHORT, time4 + 1000);
+        Assert.assertEquals(0, snapshot1.getInvalid().size());
+      } finally {
+        txm.stopAndWait();
+      }
+    } finally {
+      if (storage != null) {
+        storage.stopAndWait();
+      }
+    }
+  }
+
+  // Note: this test cannot run in AbstractTransactionStateStorageTest, since SequenceFile throws exception saying
+  // TransactionEditV2 is not TransactionEdit. Since the code path this test is verifying is the same path between
+  // HDFS and Local Storage, having this only over here is fine.
+  @SuppressWarnings("deprecation")
+  @Test
+  public void testAbortEditBackwardsCompatibility() throws Exception {
+    Configuration conf = getConfiguration("testAbortEditBackwardsCompatibility");
+
+    TransactionStateStorage storage = null;
+    try {
+      storage = getStorage(conf);
+      storage.startAndWait();
+
+      // Create edits for transaction type addition to abort
+      long time1 = System.currentTimeMillis();
+      long wp1 = time1 * TxConstants.MAX_TX_PER_MS;
+      TransactionEdit edit1 = new TransactionEditV2(wp1, wp1 - 10, TransactionEdit.State.INPROGRESS, -1L,
+                                                    null, 0L, false, null);
+      TransactionEdit edit2 = new TransactionEditV2(wp1, 0L, TransactionEdit.State.ABORTED, 0L,
+                                                    null, 0L, false, null);
+
+      long time2 = time1 + 400;
+      long wp2 = time2 * TxConstants.MAX_TX_PER_MS;
+      TransactionEdit edit3 = new TransactionEditV2(wp2, wp2 - 10, TransactionEdit.State.INPROGRESS, time2 + 10000,
+                                                    null, 0L, false, null);
+      TransactionEdit edit4 = new TransactionEditV2(wp2, 0L, TransactionEdit.State.INVALID, 0L, null, 0L, false, null);
+      // Simulate case where we cannot determine txn state during abort
+      TransactionEdit edit5 = new TransactionEditV2(wp2, 0L, TransactionEdit.State.ABORTED, 0L, null, 0L, false, null);
+
+      // write snapshot and transaction edit
+      TransactionLog log = storage.createLog(time1);
+      log.append(edit1);
+      log.append(edit2);
+      log.append(edit3);
+      log.append(edit4);
+      log.append(edit5);
+      log.close();
+
+      // Start transaction manager
+      TransactionManager txm = new TransactionManager(conf, storage, new TxMetricsCollector());
+      txm.startAndWait();
+      try {
+        // Verify that the txns in old format were read correctly.
+        // Both transactions should be in invalid state
+        TransactionSnapshot snapshot1 = txm.getCurrentState();
+        Assert.assertEquals(ImmutableList.of(wp1, wp2), snapshot1.getInvalid());
+        Assert.assertEquals(0, snapshot1.getInProgress().size());
+        Assert.assertEquals(0, snapshot1.getCommittedChangeSets().size());
+        Assert.assertEquals(0, snapshot1.getCommittingChangeSets().size());
+      } finally {
+        txm.stopAndWait();
+      }
+    } finally {
+      if (storage != null) {
+        storage.stopAndWait();
+      }
+    }
+  }
+
+  private void verifyInProgress(TransactionManager.InProgressTx inProgressTx, TransactionType type,
+                                long expiration) throws Exception {
+    Assert.assertEquals(type, inProgressTx.getType());
+    Assert.assertTrue(inProgressTx.getExpiration() == expiration);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/persist/TransactionEditTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/persist/TransactionEditTest.java b/tephra-core/src/test/java/org/apache/tephra/persist/TransactionEditTest.java
new file mode 100644
index 0000000..46cb81c
--- /dev/null
+++ b/tephra-core/src/test/java/org/apache/tephra/persist/TransactionEditTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.persist;
+
+import com.google.common.collect.Sets;
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
+import org.apache.tephra.ChangeId;
+import org.apache.tephra.TransactionType;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.DataInput;
+import java.io.IOException;
+
+/**
+ * test for {@link TransactionEdit}
+ */
+public class TransactionEditTest {
+  private static final byte[] COL = new byte[] {'c'};
+
+  @Test
+  public void testV1SerdeCompat() throws Exception {
+    TransactionEditCodecs.TransactionEditCodec olderCodec = new TransactionEditCodecs.TransactionEditCodecV1();
+    // start tx edit and committed tx edit cover all fields of tx edit
+    // NOTE: set visibilityUpperBound to 0 and transaction type to null as this is expected default 
+    // for decoding older versions that doesn't store it
+    verifyDecodingSupportsOlderVersion(TransactionEdit.createStarted(2L, 0L, 1000L, null), olderCodec);
+    verifyDecodingSupportsOlderVersion(
+      TransactionEdit.createCommitted(2L, Sets.newHashSet(new ChangeId(COL)), 3L, true), olderCodec);
+  }
+  
+  @Test
+  public void testV2SerdeCompat() throws Exception {
+    TransactionEditCodecs.TransactionEditCodec olderCodec = new TransactionEditCodecs.TransactionEditCodecV2();
+    // start tx edit and committed tx edit cover all fields of tx edit
+    // NOTE: transaction type to null as this is expected default for decoding older versions that doesn't store it
+    verifyDecodingSupportsOlderVersion(TransactionEdit.createStarted(2L, 100L, 1000L, null), olderCodec);
+    verifyDecodingSupportsOlderVersion(
+      TransactionEdit.createCommitted(2L, Sets.newHashSet(new ChangeId(COL)), 3L, true), olderCodec);
+  }
+
+  @SuppressWarnings("deprecation")
+  private void verifyDecodingSupportsOlderVersion(TransactionEdit edit, 
+                                                  TransactionEditCodecs.TransactionEditCodec olderCodec)
+    throws IOException {
+    // encoding with older version of codec
+    ByteArrayDataOutput out = ByteStreams.newDataOutput();
+    TransactionEditCodecs.encode(edit, out, olderCodec);
+
+    // decoding
+    TransactionEdit decodedEdit = new TransactionEdit();
+    DataInput in = ByteStreams.newDataInput(out.toByteArray());
+    decodedEdit.readFields(in);
+
+    Assert.assertEquals(edit, decodedEdit);
+  }
+
+  @Test
+  public void testSerialization() throws Exception {
+    assertSerializedEdit(TransactionEdit.createAborted(1L, TransactionType.SHORT, new long[0]));
+    assertSerializedEdit(TransactionEdit.createAborted(1L, TransactionType.SHORT, new long[]{ 2L, 3L }));
+    assertSerializedEdit(TransactionEdit.createAborted(1L, TransactionType.LONG, new long[0]));
+    assertSerializedEdit(TransactionEdit.createAborted(1L, TransactionType.LONG, new long[]{ 2L, 3L }));
+
+    assertSerializedEdit(TransactionEdit.createCheckpoint(2L, 1L));
+
+    assertSerializedEdit(TransactionEdit.createCommitted(1L, Sets.<ChangeId>newHashSet(), 2L, false));
+    assertSerializedEdit(TransactionEdit.createCommitted(1L, Sets.<ChangeId>newHashSet(), 2L, true));
+    assertSerializedEdit(TransactionEdit.createCommitted(1L,
+        Sets.newHashSet(new ChangeId(new byte[]{'a', 'b', 'c'})), 2L, false));
+    assertSerializedEdit(TransactionEdit.createCommitted(1L,
+        Sets.newHashSet(new ChangeId(new byte[]{ 'a', 'b', 'c' }), new ChangeId(new byte[]{ 'd', 'e', 'f' })),
+        2L, true));
+
+    assertSerializedEdit(TransactionEdit.createCommitting(1L, Sets.<ChangeId>newHashSet()));
+    assertSerializedEdit(TransactionEdit.createCommitting(1L,
+        Sets.newHashSet(new ChangeId(new byte[]{'a', 'b', 'c'}))));
+    assertSerializedEdit(TransactionEdit.createCommitting(1L,
+        Sets.newHashSet(new ChangeId(new byte[]{'a', 'b', 'c'}), new ChangeId(new byte[]{'d', 'e', 'f'}))));
+
+    assertSerializedEdit(TransactionEdit.createInvalid(1L));
+
+    assertSerializedEdit(TransactionEdit.createMoveWatermark(10L));
+
+    assertSerializedEdit(TransactionEdit.createStarted(2L, 1L, System.currentTimeMillis() + 1000,
+        TransactionType.SHORT));
+    assertSerializedEdit(TransactionEdit.createStarted(2L, 1L, System.currentTimeMillis() + 10000,
+        TransactionType.LONG));
+
+    assertSerializedEdit(TransactionEdit.createTruncateInvalidTx(Sets.newHashSet(new Long(1))));
+    assertSerializedEdit(TransactionEdit.createTruncateInvalidTx(
+        Sets.newHashSet(new Long(1), new Long(2), new Long(3))));
+
+    assertSerializedEdit(TransactionEdit.createTruncateInvalidTxBefore(System.currentTimeMillis()));
+  }
+
+  private void assertSerializedEdit(TransactionEdit originalEdit) throws IOException {
+    ByteArrayDataOutput out = ByteStreams.newDataOutput();
+    originalEdit.write(out);
+
+    TransactionEdit decodedEdit = new TransactionEdit();
+    DataInput in = ByteStreams.newDataInput(out.toByteArray());
+    decodedEdit.readFields(in);
+
+    Assert.assertEquals(originalEdit, decodedEdit);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java b/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java
new file mode 100644
index 0000000..afdff5c
--- /dev/null
+++ b/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.snapshot;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSortedMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tephra.ChangeId;
+import org.apache.tephra.Transaction;
+import org.apache.tephra.TransactionManager;
+import org.apache.tephra.TransactionNotInProgressException;
+import org.apache.tephra.TransactionType;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.persist.TransactionSnapshot;
+import org.apache.tephra.persist.TransactionStateStorage;
+import org.apache.tephra.persist.TransactionVisibilityState;
+import org.apache.tephra.runtime.ConfigModule;
+import org.apache.tephra.runtime.DiscoveryModules;
+import org.apache.tephra.runtime.TransactionModules;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests related to {@link SnapshotCodec} implementations.
+ */
+public class SnapshotCodecTest {
+  @ClassRule
+  public static TemporaryFolder tmpDir = new TemporaryFolder();
+
+  @Test
+  public void testMinimalDeserilization() throws Exception {
+    long now = System.currentTimeMillis();
+    long nowWritePointer = now * TxConstants.MAX_TX_PER_MS;
+    /*
+     * Snapshot consisting of transactions at:
+     */
+    long tInvalid = nowWritePointer - 5;    // t1 - invalid
+    long readPtr = nowWritePointer - 4;     // t2 - here and earlier committed
+    long tLong = nowWritePointer - 3;       // t3 - in-progress LONG
+    long tCommitted = nowWritePointer - 2;  // t4 - committed, changeset (r1, r2)
+    long tShort = nowWritePointer - 1;      // t5 - in-progress SHORT, canCommit called, changeset (r3, r4)
+
+    TreeMap<Long, TransactionManager.InProgressTx> inProgress = Maps.newTreeMap(ImmutableSortedMap.of(
+      tLong, new TransactionManager.InProgressTx(readPtr,
+                                                 TransactionManager.getTxExpirationFromWritePointer(
+                                                   tLong, TxConstants.Manager.DEFAULT_TX_LONG_TIMEOUT),
+                                                 TransactionType.LONG),
+      tShort, new TransactionManager.InProgressTx(readPtr, now + 1000, TransactionType.SHORT)));
+
+    TransactionSnapshot snapshot = new TransactionSnapshot(now, readPtr, nowWritePointer,
+                                                           Lists.newArrayList(tInvalid), // invalid
+                                                           inProgress, ImmutableMap.<Long, Set<ChangeId>>of(
+                                                             tShort, Sets.<ChangeId>newHashSet()),
+                                                           ImmutableMap.<Long, Set<ChangeId>>of(
+                                                             tCommitted, Sets.<ChangeId>newHashSet()));
+
+    Configuration conf1 = new Configuration();
+    conf1.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, SnapshotCodecV4.class.getName());
+    SnapshotCodecProvider provider1 = new SnapshotCodecProvider(conf1);
+
+    byte[] byteArray;
+    try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+      provider1.encode(out, snapshot);
+      byteArray = out.toByteArray();
+    }
+
+    // TransactionSnapshot and TransactionVisibilityState decode should pass now
+    TransactionSnapshot txSnapshot = provider1.decode(new ByteArrayInputStream(byteArray));
+    TransactionVisibilityState txVisibilityState =
+      provider1.decodeTransactionVisibilityState(new ByteArrayInputStream(byteArray));
+    assertTransactionVisibilityStateEquals(txSnapshot, txVisibilityState);
+
+    // Corrupt the serialization byte array so that full deserialization will fail
+    byteArray[byteArray.length - 1] = 'a';
+
+    // TransactionVisibilityState decoding should pass since it doesn't decode the committing and committed changesets.
+    TransactionVisibilityState txVisibilityState2 = provider1.decodeTransactionVisibilityState(
+      new ByteArrayInputStream(byteArray));
+    Assert.assertNotNull(txVisibilityState2);
+    Assert.assertEquals(txVisibilityState, txVisibilityState2);
+    Assert.assertEquals(readPtr, txVisibilityState2.getReadPointer());
+    try {
+      provider1.decode(new ByteArrayInputStream(byteArray));
+      Assert.fail();
+    } catch (RuntimeException e) {
+      // expected since we modified the serialization bytes
+    }
+  }
+
+  /**
+   * In-progress LONG transactions written with DefaultSnapshotCodec will not have the type serialized as part of
+   * the data.  Since these transactions also contain a non-negative expiration, we need to ensure we reset the type
+   * correctly when the snapshot is loaded.
+   */
+  @Test
+  public void testDefaultToV3Compatibility() throws Exception {
+    long now = System.currentTimeMillis();
+    long nowWritePointer = now * TxConstants.MAX_TX_PER_MS;
+    /*
+     * Snapshot consisting of transactions at:
+     */
+    long tInvalid = nowWritePointer - 5;    // t1 - invalid
+    long readPtr = nowWritePointer - 4;     // t2 - here and earlier committed
+    long tLong = nowWritePointer - 3;       // t3 - in-progress LONG
+    long tCommitted = nowWritePointer - 2;  // t4 - committed, changeset (r1, r2)
+    long tShort = nowWritePointer - 1;      // t5 - in-progress SHORT, canCommit called, changeset (r3, r4)
+
+    TreeMap<Long, TransactionManager.InProgressTx> inProgress = Maps.newTreeMap(ImmutableSortedMap.of(
+        tLong, new TransactionManager.InProgressTx(readPtr,
+            TransactionManager.getTxExpirationFromWritePointer(tLong, TxConstants.Manager.DEFAULT_TX_LONG_TIMEOUT),
+            TransactionType.LONG),
+        tShort, new TransactionManager.InProgressTx(readPtr, now + 1000, TransactionType.SHORT)));
+
+    TransactionSnapshot snapshot = new TransactionSnapshot(now, readPtr, nowWritePointer,
+        Lists.newArrayList(tInvalid), // invalid
+        inProgress,
+        ImmutableMap.<Long, Set<ChangeId>>of(
+            tShort, Sets.newHashSet(new ChangeId(new byte[]{'r', '3'}), new ChangeId(new byte[]{'r', '4'}))),
+        ImmutableMap.<Long, Set<ChangeId>>of(
+            tCommitted, Sets.newHashSet(new ChangeId(new byte[]{'r', '1'}), new ChangeId(new byte[]{'r', '2'}))));
+
+    Configuration conf1 = new Configuration();
+    conf1.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, DefaultSnapshotCodec.class.getName());
+    SnapshotCodecProvider provider1 = new SnapshotCodecProvider(conf1);
+
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    try {
+      provider1.encode(out, snapshot);
+    } finally {
+      out.close();
+    }
+
+    TransactionSnapshot snapshot2 = provider1.decode(new ByteArrayInputStream(out.toByteArray()));
+    TransactionVisibilityState minTxSnapshot = provider1.decodeTransactionVisibilityState(
+      new ByteArrayInputStream(out.toByteArray()));
+    assertTransactionVisibilityStateEquals(snapshot2, minTxSnapshot);
+
+    assertEquals(snapshot.getReadPointer(), snapshot2.getReadPointer());
+    assertEquals(snapshot.getWritePointer(), snapshot2.getWritePointer());
+    assertEquals(snapshot.getInvalid(), snapshot2.getInvalid());
+    // in-progress transactions will have missing types
+    assertNotEquals(snapshot.getInProgress(), snapshot2.getInProgress());
+    assertEquals(snapshot.getCommittingChangeSets(), snapshot2.getCommittingChangeSets());
+    assertEquals(snapshot.getCommittedChangeSets(), snapshot2.getCommittedChangeSets());
+
+    // after fixing in-progress, full snapshot should match
+    Map<Long, TransactionManager.InProgressTx> fixedInProgress = TransactionManager.txnBackwardsCompatCheck(
+        TxConstants.Manager.DEFAULT_TX_LONG_TIMEOUT, 10000L, snapshot2.getInProgress());
+    assertEquals(snapshot.getInProgress(), fixedInProgress);
+    assertEquals(snapshot, snapshot2);
+  }
+
+  /**
+   * Test full stack serialization for a TransactionManager migrating from DefaultSnapshotCodec to SnapshotCodecV3.
+   */
+  @Test
+  public void testDefaultToV3Migration() throws Exception {
+    File testDir = tmpDir.newFolder("testDefaultToV3Migration");
+    Configuration conf = new Configuration();
+    conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, DefaultSnapshotCodec.class.getName());
+    conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_LOCAL_DIR, testDir.getAbsolutePath());
+
+    Injector injector = Guice.createInjector(new ConfigModule(conf),
+        new DiscoveryModules().getSingleNodeModules(), new TransactionModules().getSingleNodeModules());
+
+    TransactionManager txManager = injector.getInstance(TransactionManager.class);
+    txManager.startAndWait();
+
+    txManager.startLong();
+
+    // shutdown to force a snapshot
+    txManager.stopAndWait();
+
+    TransactionStateStorage txStorage = injector.getInstance(TransactionStateStorage.class);
+    txStorage.startAndWait();
+
+    // confirm that the in-progress entry is missing a type
+    TransactionSnapshot snapshot = txStorage.getLatestSnapshot();
+    TransactionVisibilityState txVisibilityState = txStorage.getLatestTransactionVisibilityState();
+    assertTransactionVisibilityStateEquals(snapshot, txVisibilityState);
+    assertNotNull(snapshot);
+    assertEquals(1, snapshot.getInProgress().size());
+    Map.Entry<Long, TransactionManager.InProgressTx> entry =
+        snapshot.getInProgress().entrySet().iterator().next();
+    assertNull(entry.getValue().getType());
+    txStorage.stopAndWait();
+
+
+    // start a new Tx manager to test fixup
+    Configuration conf2 = new Configuration();
+    conf2.set(TxConstants.Manager.CFG_TX_SNAPSHOT_LOCAL_DIR, testDir.getAbsolutePath());
+    conf2.setStrings(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES,
+                     DefaultSnapshotCodec.class.getName(), SnapshotCodecV3.class.getName());
+    Injector injector2 = Guice.createInjector(new ConfigModule(conf2),
+        new DiscoveryModules().getSingleNodeModules(), new TransactionModules().getSingleNodeModules());
+
+    TransactionManager txManager2 = injector2.getInstance(TransactionManager.class);
+    txManager2.startAndWait();
+
+    // state should be recovered
+    TransactionSnapshot snapshot2 = txManager2.getCurrentState();
+    assertEquals(1, snapshot2.getInProgress().size());
+    Map.Entry<Long, TransactionManager.InProgressTx> inProgressTx =
+        snapshot2.getInProgress().entrySet().iterator().next();
+    assertEquals(TransactionType.LONG, inProgressTx.getValue().getType());
+
+    // save a new snapshot
+    txManager2.stopAndWait();
+
+    TransactionStateStorage txStorage2 = injector2.getInstance(TransactionStateStorage.class);
+    txStorage2.startAndWait();
+
+    TransactionSnapshot snapshot3 = txStorage2.getLatestSnapshot();
+    // full snapshot should have deserialized correctly without any fixups
+    assertEquals(snapshot2.getInProgress(), snapshot3.getInProgress());
+    assertEquals(snapshot2, snapshot3);
+    txStorage2.stopAndWait();
+  }
+
+  @Test
+  public void testSnapshotCodecProviderConfiguration() throws Exception {
+    Configuration conf = new Configuration(false);
+    StringBuilder buf = new StringBuilder();
+    for (Class c : TxConstants.Persist.DEFAULT_TX_SNAPHOT_CODEC_CLASSES) {
+      if (buf.length() > 0) {
+        buf.append(",\n    ");
+      }
+      buf.append(c.getName());
+    }
+    conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, buf.toString());
+
+    SnapshotCodecProvider codecProvider = new SnapshotCodecProvider(conf);
+    SnapshotCodec v1codec = codecProvider.getCodecForVersion(new DefaultSnapshotCodec().getVersion());
+    assertNotNull(v1codec);
+    assertTrue(v1codec instanceof DefaultSnapshotCodec);
+
+    SnapshotCodec v2codec = codecProvider.getCodecForVersion(new SnapshotCodecV2().getVersion());
+    assertNotNull(v2codec);
+    assertTrue(v2codec instanceof SnapshotCodecV2);
+
+    SnapshotCodec v3codec = codecProvider.getCodecForVersion(new SnapshotCodecV3().getVersion());
+    assertNotNull(v3codec);
+    assertTrue(v3codec instanceof SnapshotCodecV3);
+
+    SnapshotCodec v4codec = codecProvider.getCodecForVersion(new SnapshotCodecV4().getVersion());
+    assertNotNull(v4codec);
+    assertTrue(v4codec instanceof SnapshotCodecV4);
+  }
+
+  @Test
+  public void testSnapshotCodecV4() throws IOException, TransactionNotInProgressException {
+    File testDir = tmpDir.newFolder("testSnapshotCodecV4");
+    Configuration conf = new Configuration();
+    conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, SnapshotCodecV4.class.getName());
+    conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_LOCAL_DIR, testDir.getAbsolutePath());
+
+    Injector injector = Guice.createInjector(new ConfigModule(conf),
+                                             new DiscoveryModules().getSingleNodeModules(),
+                                             new TransactionModules().getSingleNodeModules());
+
+    TransactionManager txManager = injector.getInstance(TransactionManager.class);
+    txManager.startAndWait();
+
+    // Create a transaction and a checkpoint transaction
+    Transaction transaction = txManager.startLong();
+    Transaction checkpointTx = txManager.checkpoint(transaction);
+
+    // shutdown to force a snapshot
+    txManager.stopAndWait();
+
+    // Validate the snapshot on disk
+    TransactionStateStorage txStorage = injector.getInstance(TransactionStateStorage.class);
+    txStorage.startAndWait();
+
+    TransactionSnapshot snapshot = txStorage.getLatestSnapshot();
+    TransactionVisibilityState txVisibilityState = txStorage.getLatestTransactionVisibilityState();
+    assertTransactionVisibilityStateEquals(snapshot, txVisibilityState);
+
+    Map<Long, TransactionManager.InProgressTx> inProgress = snapshot.getInProgress();
+    Assert.assertEquals(1, inProgress.size());
+
+    TransactionManager.InProgressTx inProgressTx = inProgress.get(transaction.getTransactionId());
+    Assert.assertNotNull(inProgressTx);
+    Assert.assertArrayEquals(checkpointTx.getCheckpointWritePointers(),
+                             inProgressTx.getCheckpointWritePointers().toLongArray());
+
+    txStorage.stopAndWait();
+
+    // start a new Tx manager to see if the transaction is restored correctly.
+    Injector injector2 = Guice.createInjector(new ConfigModule(conf),
+                                              new DiscoveryModules().getSingleNodeModules(),
+                                              new TransactionModules().getSingleNodeModules());
+
+    txManager = injector2.getInstance(TransactionManager.class);
+    txManager.startAndWait();
+
+    // state should be recovered
+    snapshot = txManager.getCurrentState();
+    inProgress = snapshot.getInProgress();
+    Assert.assertEquals(1, inProgress.size());
+
+    inProgressTx = inProgress.get(transaction.getTransactionId());
+    Assert.assertNotNull(inProgressTx);
+    Assert.assertArrayEquals(checkpointTx.getCheckpointWritePointers(),
+                             inProgressTx.getCheckpointWritePointers().toLongArray());
+
+    // Should be able to commit the transaction
+    Assert.assertTrue(txManager.canCommit(checkpointTx, Collections.<byte[]>emptyList()));
+    Assert.assertTrue(txManager.commit(checkpointTx));
+
+    // save a new snapshot
+    txManager.stopAndWait();
+
+    TransactionStateStorage txStorage2 = injector2.getInstance(TransactionStateStorage.class);
+    txStorage2.startAndWait();
+
+    snapshot = txStorage2.getLatestSnapshot();
+    Assert.assertTrue(snapshot.getInProgress().isEmpty());
+    txStorage2.stopAndWait();
+  }
+
+  private void assertTransactionVisibilityStateEquals(TransactionVisibilityState expected,
+                                                      TransactionVisibilityState input) {
+    Assert.assertEquals(expected.getTimestamp(), input.getTimestamp());
+    Assert.assertEquals(expected.getReadPointer(), input.getReadPointer());
+    Assert.assertEquals(expected.getWritePointer(), input.getWritePointer());
+    Assert.assertEquals(expected.getInProgress(), input.getInProgress());
+    Assert.assertEquals(expected.getInvalid(), input.getInvalid());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/util/AbstractConfigurationProviderTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/util/AbstractConfigurationProviderTest.java b/tephra-core/src/test/java/org/apache/tephra/util/AbstractConfigurationProviderTest.java
new file mode 100644
index 0000000..8526b75
--- /dev/null
+++ b/tephra-core/src/test/java/org/apache/tephra/util/AbstractConfigurationProviderTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ *
+ */
+public abstract class AbstractConfigurationProviderTest {
+  @Test
+  public void testVersionFactory() {
+    HBaseVersion.Version foundVersion = HBaseVersion.get();
+    assertEquals(getExpectedVersion(), foundVersion);
+  }
+
+  protected abstract HBaseVersion.Version getExpectedVersion();
+
+  @Test
+  public void testConfigurationProvider() {
+    Configuration conf = new Configuration();
+    conf.set("foo", "bar");
+    Configuration newConf = new ConfigurationFactory().get(conf);
+    assertNotNull(newConf);
+    assertEquals("bar", newConf.get("foo"));
+  }
+}