You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by po...@apache.org on 2016/05/06 23:02:39 UTC

[33/51] [partial] incubator-tephra git commit: Rename package to org.apache.tephra

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/persist/HDFSUtil.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/HDFSUtil.java b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSUtil.java
new file mode 100644
index 0000000..4b9e646
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSUtil.java
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tephra.persist;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.lang.reflect.Method;
+
+/**
+ * Utility for handling HDFS file lease recovery.  This is a copy-n-paste fork of
+ * {@link org.apache.hadoop.hbase.util.FSHDFSUtils} from the latest HBase 0.94 version (as of 0.94.12),
+ * which contains some additional fixes not present in our current HBase dependency version --
+ * mainly checking the return value of the {@code DistributedFileSystem.recoverLease()} call to verify that
+ * recovery succeeded.
+ */
+public class HDFSUtil {
+  private static final Logger LOG = LoggerFactory.getLogger(HDFSUtil.class);
+  /**
+   * Recover the lease from HDFS, retrying multiple times.
+   */
+  public void recoverFileLease(final FileSystem fs, final Path p,
+                               Configuration conf)
+    throws IOException {
+    // lease recovery not needed for local file system case.
+    if (!(fs instanceof DistributedFileSystem)) {
+      return;
+    }
+    recoverDFSFileLease((DistributedFileSystem) fs, p, conf);
+  }
+
+  /*
+   * Run the dfs recover lease. recoverLease is asynchronous. It returns:
+   *    -false when it starts the lease recovery (i.e. lease recovery not *yet* done)
+   *    - true when the lease recovery has succeeded or the file is closed.
+   * But, we have to be careful.  Each time we call recoverLease, it starts the recover lease
+   * process over from the beginning.  We could put ourselves in a situation where we are
+   * doing nothing but starting a recovery, interrupting it to start again, and so on.
+   * The findings over in HBASE-8354 have it that the namenode will try to recover the lease
+   * on the file's primary node.  If all is well, it should return near immediately.  But,
+   * as is common, it is the very primary node that has crashed and so the namenode will be
+   * stuck waiting on a socket timeout before it will ask another datanode to start the
+   * recovery. It does not help if we call recoverLease in the meantime and in particular,
+   * subsequent to the socket timeout, a recoverLease invocation will cause us to start
+   * over from square one (possibly waiting on socket timeout against primary node).  So,
+   * in the below, we do the following:
+   * 1. Call recoverLease.
+   * 2. If it returns true, break.
+   * 3. If it returns false, wait a few seconds and then call it again.
+   * 4. If it returns true, break.
+   * 5. If it returns false, wait for what we think the datanode socket timeout is
+   * (configurable) and then try again.
+   * 6. If it returns true, break.
+   * 7. If it returns false, repeat starting at step 5. above.
+   *
+   * If HDFS-4525 is available, call it every second and we might be able to exit early.
+   */
+  boolean recoverDFSFileLease(final DistributedFileSystem dfs, final Path p,
+                              final Configuration conf)
+    throws IOException {
+    LOG.info("Recovering lease on dfs file " + p);
+    long startWaiting = System.currentTimeMillis();
+    // Default is 15 minutes. It's huge, but the idea is that if we have a major issue, HDFS
+    // usually needs 10 minutes before marking the nodes as dead. So we're putting ourselves
+    // beyond that limit 'to be safe'.
+    long recoveryTimeout = conf.getInt("hbase.lease.recovery.timeout", 900000) + startWaiting;
+    // This setting should be what the cluster dfs heartbeat is set to.
+    long firstPause = conf.getInt("hbase.lease.recovery.first.pause", 3000);
+    // This should be set to how long it'll take for us to timeout against primary datanode if it
+    // is dead.  We set it to 61 seconds, 1 second than the default READ_TIMEOUT in HDFS, the
+    // default value for DFS_CLIENT_SOCKET_TIMEOUT_KEY.
+    long subsequentPause = conf.getInt("hbase.lease.recovery.dfs.timeout", 61 * 1000);
+
+    Method isFileClosedMeth = null;
+    // whether we need to look for isFileClosed method
+    boolean findIsFileClosedMeth = true;
+    boolean recovered = false;
+    // We break the loop if we succeed the lease recovery, timeout, or we throw an exception.
+    for (int nbAttempt = 0; !recovered; nbAttempt++) {
+      recovered = recoverLease(dfs, nbAttempt, p, startWaiting);
+      if (recovered || checkIfTimedout(conf, recoveryTimeout, nbAttempt, p, startWaiting)) {
+        break;
+      }
+      try {
+        // On the first time through wait the short 'firstPause'.
+        if (nbAttempt == 0) {
+          Thread.sleep(firstPause);
+        } else {
+          // Cycle here until subsequentPause elapses.  While spinning, check isFileClosed if
+          // available (should be in hadoop 2.0.5... not in hadoop 1 though.
+          long localStartWaiting = System.currentTimeMillis();
+          while ((System.currentTimeMillis() - localStartWaiting) <
+            subsequentPause) {
+            Thread.sleep(conf.getInt("hbase.lease.recovery.pause", 1000));
+            if (findIsFileClosedMeth) {
+              try {
+                isFileClosedMeth = dfs.getClass().getMethod("isFileClosed",
+                                                            new Class[]{ Path.class });
+              } catch (NoSuchMethodException nsme) {
+                LOG.debug("isFileClosed not available");
+              } finally {
+                findIsFileClosedMeth = false;
+              }
+            }
+            if (isFileClosedMeth != null && isFileClosed(dfs, isFileClosedMeth, p)) {
+              recovered = true;
+              break;
+            }
+          }
+        }
+      } catch (InterruptedException ie) {
+        InterruptedIOException iioe = new InterruptedIOException();
+        iioe.initCause(ie);
+        throw iioe;
+      }
+    }
+    return recovered;
+  }
+
+  boolean checkIfTimedout(final Configuration conf, final long recoveryTimeout,
+                          final int nbAttempt, final Path p, final long startWaiting) {
+    if (recoveryTimeout < System.currentTimeMillis()) {
+      LOG.warn("Cannot recoverLease after trying for " +
+                 conf.getInt("hbase.lease.recovery.timeout", 900000) +
+                 "ms (hbase.lease.recovery.timeout); continuing, but may be DATALOSS!!!; " +
+                 getLogMessageDetail(nbAttempt, p, startWaiting));
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Try to recover the lease.
+   * @param dfs The filesystem instance.
+   * @param nbAttempt Count number of this attempt.
+   * @param p Path of the file to recover.
+   * @param startWaiting Timestamp of when we started attempting to recover the file lease.
+   * @return True if dfs#recoverLease came by true.
+   * @throws java.io.FileNotFoundException
+   */
+  boolean recoverLease(final DistributedFileSystem dfs, final int nbAttempt, final Path p,
+                       final long startWaiting)
+    throws FileNotFoundException {
+    boolean recovered = false;
+    try {
+      recovered = dfs.recoverLease(p);
+      LOG.info("recoverLease=" + recovered + ", " +
+                 getLogMessageDetail(nbAttempt, p, startWaiting));
+    } catch (IOException e) {
+      if (e instanceof LeaseExpiredException && e.getMessage().contains("File does not exist")) {
+        // This exception comes out instead of FNFE, fix it
+        throw new FileNotFoundException("The given file wasn't found at " + p);
+      } else if (e instanceof FileNotFoundException) {
+        throw (FileNotFoundException) e;
+      }
+      LOG.warn(getLogMessageDetail(nbAttempt, p, startWaiting), e);
+    }
+    return recovered;
+  }
+
+  /**
+   * @param nbAttempt Attempt number for the lease recovery.
+   * @param p Path of the file to recover.
+   * @param startWaiting Timestamp of when we started attempting to recover the file lease.
+   * @return Detail to append to any log message around lease recovering.
+   */
+  private String getLogMessageDetail(final int nbAttempt, final Path p, final long startWaiting) {
+    return "attempt=" + nbAttempt + " on file=" + p + " after " +
+      (System.currentTimeMillis() - startWaiting) + "ms";
+  }
+
+  /**
+   * Call HDFS-4525 isFileClosed if it is available.
+   * @param dfs Filesystem instance to use.
+   * @param m Method instance to call.
+   * @param p Path of the file to check is closed.
+   * @return True if file is closed.
+   */
+  private boolean isFileClosed(final DistributedFileSystem dfs, final Method m, final Path p) {
+    try {
+      return (Boolean) m.invoke(dfs, p);
+    } catch (SecurityException e) {
+      LOG.warn("No access", e);
+    } catch (Exception e) {
+      LOG.warn("Failed invocation for " + p.toString(), e);
+    }
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/persist/LocalFileTransactionLog.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/LocalFileTransactionLog.java b/tephra-core/src/main/java/org/apache/tephra/persist/LocalFileTransactionLog.java
new file mode 100644
index 0000000..d81ba38
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/LocalFileTransactionLog.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.persist;
+
+import org.apache.tephra.metrics.MetricsCollector;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+/**
+ * Reads and writes transaction logs against files in the local filesystem.
+ */
+public class LocalFileTransactionLog extends AbstractTransactionLog {
+  private final File logFile;
+
+  /**
+   * Creates a new transaction log using the given file instance.
+   * @param logFile The log file to use.
+   */
+  public LocalFileTransactionLog(File logFile, long timestamp, MetricsCollector metricsCollector) {
+    super(timestamp, metricsCollector);
+    this.logFile = logFile;
+  }
+
+  @Override
+  public String getName() {
+    return logFile.getAbsolutePath();
+  }
+
+  @Override
+  protected TransactionLogWriter createWriter() throws IOException {
+    return new LogWriter(logFile);
+  }
+
+  @Override
+  public TransactionLogReader getReader() throws IOException {
+    return new LogReader(logFile);
+  }
+
+  private static final class LogWriter implements TransactionLogWriter {
+    private final FileOutputStream fos;
+    private final DataOutputStream out;
+
+    public LogWriter(File logFile) throws IOException {
+      this.fos = new FileOutputStream(logFile);
+      this.out = new DataOutputStream(new BufferedOutputStream(fos, LocalFileTransactionStateStorage.BUFFER_SIZE));
+    }
+
+    @Override
+    public void append(Entry entry) throws IOException {
+      entry.write(out);
+    }
+
+    @Override
+    public void commitMarker(int count) throws IOException {
+      // skip for local file
+    }
+
+    @Override
+    public void sync() throws IOException {
+      out.flush();
+    }
+
+    @Override
+    public void close() throws IOException {
+      out.flush();
+      out.close();
+      fos.close();
+    }
+  }
+
+  private static final class LogReader implements TransactionLogReader {
+    private final FileInputStream fin;
+    private final DataInputStream in;
+    private Entry reuseEntry = new Entry();
+
+    public LogReader(File logFile) throws IOException {
+      this.fin = new FileInputStream(logFile);
+      this.in = new DataInputStream(new BufferedInputStream(fin, LocalFileTransactionStateStorage.BUFFER_SIZE));
+    }
+
+    @Override
+    public TransactionEdit next() throws IOException {
+      Entry entry = new Entry();
+      try {
+        entry.readFields(in);
+      } catch (EOFException eofe) {
+        // signal end of file by returning null
+        return null;
+      }
+      return entry.getEdit();
+    }
+
+    @Override
+    public TransactionEdit next(TransactionEdit reuse) throws IOException {
+      try {
+        reuseEntry.getKey().readFields(in);
+        reuse.readFields(in);
+      } catch (EOFException eofe) {
+        // signal end of file by returning null
+        return null;
+      }
+      return reuse;
+    }
+
+    @Override
+    public void close() throws IOException {
+      in.close();
+      fin.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/persist/LocalFileTransactionStateStorage.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/LocalFileTransactionStateStorage.java b/tephra-core/src/main/java/org/apache/tephra/persist/LocalFileTransactionStateStorage.java
new file mode 100644
index 0000000..beddbb2
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/LocalFileTransactionStateStorage.java
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.persist;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.io.Closeables;
+import com.google.common.io.Files;
+import com.google.common.primitives.Longs;
+import com.google.inject.Inject;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.metrics.MetricsCollector;
+import org.apache.tephra.snapshot.SnapshotCodecProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import javax.annotation.Nullable;
+
+/**
+ * Persists transaction snapshots and write-ahead logs to files on the local filesystem.
+ */
+public class LocalFileTransactionStateStorage extends AbstractTransactionStateStorage {
+  private static final String TMP_SNAPSHOT_FILE_PREFIX = ".in-progress.";
+  private static final String SNAPSHOT_FILE_PREFIX = "snapshot.";
+  private static final String LOG_FILE_PREFIX = "txlog.";
+  private static final Logger LOG = LoggerFactory.getLogger(LocalFileTransactionStateStorage.class);
+  static final int BUFFER_SIZE = 16384;
+
+  private static final FilenameFilter SNAPSHOT_FILE_FILTER = new FilenameFilter() {
+    @Override
+    public boolean accept(File file, String s) {
+      return s.startsWith(SNAPSHOT_FILE_PREFIX);
+    }
+  };
+
+  private final String configuredSnapshotDir;
+  private final MetricsCollector metricsCollector;
+  private File snapshotDir;
+
+  @Inject
+  public LocalFileTransactionStateStorage(Configuration conf, SnapshotCodecProvider codecProvider,
+                                          MetricsCollector metricsCollector) {
+    super(codecProvider);
+    this.configuredSnapshotDir = conf.get(TxConstants.Manager.CFG_TX_SNAPSHOT_LOCAL_DIR);
+    this.metricsCollector = metricsCollector;
+  }
+
+  @Override
+  protected void startUp() throws Exception {
+    Preconditions.checkState(configuredSnapshotDir != null,
+        "Snapshot directory is not configured.  Please set " + TxConstants.Manager.CFG_TX_SNAPSHOT_LOCAL_DIR +
+        " in configuration.");
+    snapshotDir = new File(configuredSnapshotDir);
+  }
+
+  @Override
+  protected void shutDown() throws Exception {
+    // nothing to do
+  }
+
+  @Override
+  public String getLocation() {
+    return snapshotDir.getAbsolutePath();
+  }
+
+  @Override
+  public void writeSnapshot(TransactionSnapshot snapshot) throws IOException {
+    // save the snapshot to a temporary file
+    File snapshotTmpFile = new File(snapshotDir, TMP_SNAPSHOT_FILE_PREFIX + snapshot.getTimestamp());
+    LOG.debug("Writing snapshot to temporary file {}", snapshotTmpFile);
+    OutputStream out = Files.newOutputStreamSupplier(snapshotTmpFile).getOutput();
+    boolean threw = true;
+    try {
+      codecProvider.encode(out, snapshot);
+      threw = false;
+    } finally {
+      Closeables.close(out, threw);
+    }
+
+    // move the temporary file into place with the correct filename
+    File finalFile = new File(snapshotDir, SNAPSHOT_FILE_PREFIX + snapshot.getTimestamp());
+    if (!snapshotTmpFile.renameTo(finalFile)) {
+      throw new IOException("Failed renaming temporary snapshot file " + snapshotTmpFile.getName() + " to " +
+          finalFile.getName());
+    }
+
+    LOG.debug("Completed snapshot to file {}", finalFile);
+  }
+
+  @Override
+  public TransactionSnapshot getLatestSnapshot() throws IOException {
+    InputStream is = getLatestSnapshotInputStream();
+    if (is == null) {
+      return null;
+    }
+    try {
+      return readSnapshotFile(is);
+    } finally {
+      is.close();
+    }
+  }
+
+  @Override
+  public TransactionVisibilityState getLatestTransactionVisibilityState() throws IOException {
+    InputStream is = getLatestSnapshotInputStream();
+    if (is == null) {
+      return null;
+    }
+    try {
+      return codecProvider.decodeTransactionVisibilityState(is);
+    } finally {
+      is.close();
+    }
+  }
+
+  private InputStream getLatestSnapshotInputStream() throws IOException {
+    File[] snapshotFiles = snapshotDir.listFiles(SNAPSHOT_FILE_FILTER);
+    TimestampedFilename mostRecent = null;
+    for (File file : snapshotFiles) {
+      TimestampedFilename tsFile = new TimestampedFilename(file);
+      if (mostRecent == null || tsFile.compareTo(mostRecent) > 0) {
+        mostRecent = tsFile;
+      }
+    }
+
+    if (mostRecent == null) {
+      LOG.info("No snapshot files found in {}", snapshotDir.getAbsolutePath());
+      return null;
+    }
+
+    return new FileInputStream(mostRecent.getFile());
+  }
+
+  private TransactionSnapshot readSnapshotFile(InputStream is) throws IOException {
+    return codecProvider.decode(is);
+  }
+
+  @Override
+  public long deleteOldSnapshots(int numberToKeep) throws IOException {
+    File[] snapshotFiles = snapshotDir.listFiles(SNAPSHOT_FILE_FILTER);
+    if (snapshotFiles.length == 0) {
+      return -1;
+    }
+    TimestampedFilename[] snapshotFilenames = new TimestampedFilename[snapshotFiles.length];
+    for (int i = 0; i < snapshotFiles.length; i++) {
+      snapshotFilenames[i] = new TimestampedFilename(snapshotFiles[i]);
+    }
+    Arrays.sort(snapshotFilenames, Collections.reverseOrder());
+    if (snapshotFilenames.length <= numberToKeep) {
+      // nothing to delete, just return the oldest timestamp
+      return snapshotFilenames[snapshotFilenames.length - 1].getTimestamp();
+    }
+    int toRemoveCount = snapshotFilenames.length - numberToKeep;
+    TimestampedFilename[] toRemove = new TimestampedFilename[toRemoveCount];
+    System.arraycopy(snapshotFilenames, numberToKeep, toRemove, 0, toRemoveCount);
+    int removedCnt = 0;
+    for (int i = 0; i < toRemove.length; i++) {
+      File currentFile = toRemove[i].getFile();
+      LOG.debug("Removing old snapshot file {}", currentFile.getAbsolutePath());
+      if (!toRemove[i].getFile().delete()) {
+        LOG.error("Failed deleting snapshot file {}", currentFile.getAbsolutePath());
+      } else {
+        removedCnt++;
+      }
+    }
+    long oldestTimestamp = snapshotFilenames[numberToKeep - 1].getTimestamp();
+    LOG.info("Removed {} out of {} expected snapshot files older than {}", removedCnt, toRemoveCount, oldestTimestamp);
+    return oldestTimestamp;
+  }
+
+  @Override
+  public List<String> listSnapshots() throws IOException {
+    File[] snapshots = snapshotDir.listFiles(SNAPSHOT_FILE_FILTER);
+    return Lists.transform(Arrays.asList(snapshots), new Function<File, String>() {
+      @Nullable
+      @Override
+      public String apply(@Nullable File input) {
+        return input.getName();
+      }
+    });
+  }
+
+  @Override
+  public List<TransactionLog> getLogsSince(long timestamp) throws IOException {
+    File[] logFiles = snapshotDir.listFiles(new LogFileFilter(timestamp, Long.MAX_VALUE));
+    TimestampedFilename[] timestampedFiles = new TimestampedFilename[logFiles.length];
+    for (int i = 0; i < logFiles.length; i++) {
+      timestampedFiles[i] = new TimestampedFilename(logFiles[i]);
+    }
+    // logs need to be processed in ascending order
+    Arrays.sort(timestampedFiles);
+    return Lists.transform(Arrays.asList(timestampedFiles), new Function<TimestampedFilename, TransactionLog>() {
+      @Nullable
+      @Override
+      public TransactionLog apply(@Nullable TimestampedFilename input) {
+        return new LocalFileTransactionLog(input.getFile(), input.getTimestamp(), metricsCollector);
+      }
+    });
+  }
+
+  @Override
+  public TransactionLog createLog(long timestamp) throws IOException {
+    File newLogFile = new File(snapshotDir, LOG_FILE_PREFIX + timestamp);
+    LOG.info("Creating new transaction log at {}", newLogFile.getAbsolutePath());
+    return new LocalFileTransactionLog(newLogFile, timestamp, metricsCollector);
+  }
+
+  @Override
+  public void deleteLogsOlderThan(long timestamp) throws IOException {
+    File[] logFiles = snapshotDir.listFiles(new LogFileFilter(0, timestamp));
+    int removedCnt = 0;
+    for (File file : logFiles) {
+      LOG.debug("Removing old transaction log {}", file.getPath());
+      if (file.delete()) {
+        removedCnt++;
+      } else {
+        LOG.warn("Failed to remove log file {}", file.getAbsolutePath());
+      }
+    }
+    LOG.debug("Removed {} transaction logs older than {}", removedCnt, timestamp);
+  }
+
+  @Override
+  public void setupStorage() throws IOException {
+    // create the directory if it doesn't exist
+    if (!snapshotDir.exists()) {
+      if (!snapshotDir.mkdirs()) {
+        throw new IOException("Failed to create directory " + configuredSnapshotDir +
+                                " for transaction snapshot storage");
+      }
+    } else {
+      Preconditions.checkState(snapshotDir.isDirectory(),
+                               "Configured snapshot directory " + configuredSnapshotDir + " is not a directory!");
+      Preconditions.checkState(snapshotDir.canWrite(), "Configured snapshot directory " +
+        configuredSnapshotDir + " exists but is not writable!");
+    }
+  }
+
+  @Override
+  public List<String> listLogs() throws IOException {
+    File[] logs = snapshotDir.listFiles(new LogFileFilter(0, Long.MAX_VALUE));
+    return Lists.transform(Arrays.asList(logs), new Function<File, String>() {
+      @Nullable
+      @Override
+      public String apply(@Nullable File input) {
+        return input.getName();
+      }
+    });
+  }
+
+  private static class LogFileFilter implements FilenameFilter {
+    private final long startTime;
+    private final long endTime;
+
+    public LogFileFilter(long startTime, long endTime) {
+      this.startTime = startTime;
+      this.endTime = endTime;
+    }
+
+    @Override
+    public boolean accept(File file, String s) {
+      if (s.startsWith(LOG_FILE_PREFIX)) {
+        String[] parts = s.split("\\.");
+        if (parts.length == 2) {
+          try {
+            long fileTime = Long.parseLong(parts[1]);
+            return fileTime >= startTime && fileTime < endTime;
+          } catch (NumberFormatException ignored) {
+            LOG.warn("Filename {} did not match the expected pattern prefix.<timestamp>", s);
+          }
+        }
+      }
+      return false;
+    }
+  }
+
+  /**
+   * Represents a filename composed of a prefix and a ".timestamp" suffix.  This is useful for manipulating both
+   * snapshot and transaction log filenames.
+   */
+  private static class TimestampedFilename implements Comparable<TimestampedFilename> {
+    private File file;
+    private String prefix;
+    private long timestamp;
+
+    public TimestampedFilename(File file) {
+      this.file = file;
+      String[] parts = file.getName().split("\\.");
+      if (parts.length != 2) {
+        throw new IllegalArgumentException("Filename " + file.getName() +
+                                           " did not match the expected pattern prefix.timestamp");
+      }
+      prefix = parts[0];
+      timestamp = Long.parseLong(parts[1]);
+    }
+
+    public File getFile() {
+      return file;
+    }
+
+    public String getPrefix() {
+      return prefix;
+    }
+
+    public long getTimestamp() {
+      return timestamp;
+    }
+
+    @Override
+    public int compareTo(TimestampedFilename other) {
+      int res = prefix.compareTo(other.getPrefix());
+      if (res == 0) {
+        res = Longs.compare(timestamp, other.getTimestamp());
+      }
+      return res;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/persist/NoOpTransactionStateStorage.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/NoOpTransactionStateStorage.java b/tephra-core/src/main/java/org/apache/tephra/persist/NoOpTransactionStateStorage.java
new file mode 100644
index 0000000..12f2475
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/NoOpTransactionStateStorage.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.persist;
+
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.inject.Inject;
+import org.apache.tephra.snapshot.SnapshotCodec;
+import org.apache.tephra.snapshot.SnapshotCodecProvider;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Minimal {@link TransactionStateStorage} implementation that does nothing, i.e. does not maintain any actual state.
+ */
+public class NoOpTransactionStateStorage extends AbstractIdleService implements TransactionStateStorage {
+
+  private final SnapshotCodec codec;
+
+  @Inject
+  public NoOpTransactionStateStorage(SnapshotCodecProvider codecProvider) {
+    codec = codecProvider;
+  }
+
+  @Override
+  protected void startUp() throws Exception {
+  }
+
+  @Override
+  protected void shutDown() throws Exception {
+  }
+
+  @Override
+  public void writeSnapshot(OutputStream out, TransactionSnapshot snapshot) throws IOException {
+    codec.encode(out, snapshot);
+  }
+
+  @Override
+  public void writeSnapshot(TransactionSnapshot snapshot) throws IOException {
+  }
+
+  @Override
+  public TransactionSnapshot getLatestSnapshot() throws IOException {
+    return null;
+  }
+
+  @Override
+  public TransactionVisibilityState getLatestTransactionVisibilityState() throws IOException {
+    return null;
+  }
+
+  @Override
+  public long deleteOldSnapshots(int numberToKeep) throws IOException {
+    return 0;
+  }
+
+  @Override
+  public List<String> listSnapshots() throws IOException {
+    return new ArrayList<>(0);
+  }
+
+  @Override
+  public List<TransactionLog> getLogsSince(long timestamp) throws IOException {
+    return new ArrayList<>(0);
+  }
+
+  @Override
+  public TransactionLog createLog(long timestamp) throws IOException {
+    return new NoOpTransactionLog();
+  }
+
+  @Override
+  public void deleteLogsOlderThan(long timestamp) throws IOException {
+  }
+
+  @Override
+  public void setupStorage() throws IOException {
+  }
+
+  @Override
+  public List<String> listLogs() throws IOException {
+    return new ArrayList<>(0);
+  }
+
+  @Override
+  public String getLocation() {
+    return "no-op";
+  }
+
+  private static class NoOpTransactionLog implements TransactionLog {
+    private long timestamp = System.currentTimeMillis();
+
+    @Override
+    public String getName() {
+      return "no-op";
+    }
+
+    @Override
+    public long getTimestamp() {
+      return timestamp;
+    }
+
+    @Override
+    public void append(TransactionEdit edit) throws IOException {
+    }
+
+    @Override
+    public void append(List<TransactionEdit> edits) throws IOException {
+    }
+
+    @Override
+    public void close() {
+    }
+
+    @Override
+    public TransactionLogReader getReader() {
+      return new TransactionLogReader() {
+        @Override
+        public TransactionEdit next() {
+          return null;
+        }
+
+        @Override
+        public TransactionEdit next(TransactionEdit reuse) {
+          return null;
+        }
+
+        @Override
+        public void close() {
+        }
+      };
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/persist/TransactionEdit.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/TransactionEdit.java b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionEdit.java
new file mode 100644
index 0000000..1d07e72
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionEdit.java
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.persist;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.Sets;
+import org.apache.hadoop.io.Writable;
+import org.apache.tephra.ChangeId;
+import org.apache.tephra.TransactionManager;
+import org.apache.tephra.TransactionType;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ * Represents a transaction state change in the {@link TransactionLog}.
+ */
+public class TransactionEdit implements Writable {
+
+  /**
+   * The possible state changes for a transaction.
+   */
+  public enum State {
+    INPROGRESS, COMMITTING, COMMITTED, INVALID, ABORTED, MOVE_WATERMARK, TRUNCATE_INVALID_TX, CHECKPOINT
+  }
+
+  private long writePointer;
+
+  /**
+   * stores the value of visibility upper bound
+   * (see {@link TransactionManager.InProgressTx#getVisibilityUpperBound()})
+   * for edit of {@link State#INPROGRESS} only
+   */
+  private long visibilityUpperBound;
+  private long commitPointer;
+  private long expirationDate;
+  private State state;
+  private Set<ChangeId> changes;
+  /** Whether or not the COMMITTED change should be fully committed. */
+  private boolean canCommit;
+  private TransactionType type;
+  private Set<Long> truncateInvalidTx;
+  private long truncateInvalidTxTime;
+  private long parentWritePointer;
+  private long[] checkpointPointers;
+
+  // for Writable
+  public TransactionEdit() {
+    this.changes = Sets.newHashSet();
+    this.truncateInvalidTx = Sets.newHashSet();
+  }
+
+  // package private for testing
+  TransactionEdit(long writePointer, long visibilityUpperBound, State state, long expirationDate,
+                  Set<ChangeId> changes, long commitPointer, boolean canCommit, TransactionType type,
+                  Set<Long> truncateInvalidTx, long truncateInvalidTxTime, long parentWritePointer,
+                  long[] checkpointPointers) {
+    this.writePointer = writePointer;
+    this.visibilityUpperBound = visibilityUpperBound;
+    this.state = state;
+    this.expirationDate = expirationDate;
+    this.changes = changes != null ? changes : Collections.<ChangeId>emptySet();
+    this.commitPointer = commitPointer;
+    this.canCommit = canCommit;
+    this.type = type;
+    this.truncateInvalidTx = truncateInvalidTx != null ? truncateInvalidTx : Collections.<Long>emptySet();
+    this.truncateInvalidTxTime = truncateInvalidTxTime;
+    this.parentWritePointer = parentWritePointer;
+    this.checkpointPointers = checkpointPointers;
+  }
+
+  /**
+   * Returns the transaction write pointer assigned for the state change.
+   */
+  public long getWritePointer() {
+    return writePointer;
+  }
+
+  void setWritePointer(long writePointer) {
+    this.writePointer = writePointer;
+  }
+
+  public long getVisibilityUpperBound() {
+    return visibilityUpperBound;
+  }
+
+  void setVisibilityUpperBound(long visibilityUpperBound) {
+    this.visibilityUpperBound = visibilityUpperBound;
+  }
+
+  /**
+   * Returns the type of state change represented.
+   */
+  public State getState() {
+    return state;
+  }
+
+  void setState(State state) {
+    this.state = state;
+  }
+
+  /**
+   * Returns any expiration timestamp (in milliseconds) associated with the state change.  This should only
+   * be populated for changes of type {@link State#INPROGRESS}.
+   */
+  public long getExpiration() {
+    return expirationDate;
+  }
+
+  void setExpiration(long expirationDate) {
+    this.expirationDate = expirationDate;
+  }
+
+  /**
+   * @return the set of changed row keys associated with the state change.  This is only populated for edits
+   * of type {@link State#COMMITTING} or {@link State#COMMITTED}.
+   */
+  public Set<ChangeId> getChanges() {
+    return changes;
+  }
+
+  void setChanges(Set<ChangeId> changes) {
+    this.changes = changes;
+  }
+
+  /**
+   * Returns the write pointer used to commit the row key change set.  This is only populated for edits of type
+   * {@link State#COMMITTED}.
+   */
+  public long getCommitPointer() {
+    return commitPointer;
+  }
+
+  void setCommitPointer(long commitPointer) {
+    this.commitPointer = commitPointer;
+  }
+
+  /**
+   * Returns whether or not the transaction should be moved to the committed set.  This is only populated for edits
+   * of type {@link State#COMMITTED}.
+   */
+  public boolean getCanCommit() {
+    return canCommit;
+  }
+
+  void setCanCommit(boolean canCommit) {
+    this.canCommit = canCommit;
+  }
+
+  /**
+   * Returns the transaction type. This is only populated for edits of type {@link State#INPROGRESS} or 
+   * {@link State#ABORTED}.
+   */
+  public TransactionType getType() {
+    return type;
+  }
+
+  void setType(TransactionType type) {
+    this.type = type;
+  }
+
+  /**
+   * Returns the transaction ids to be removed from invalid transaction list. This is only populated for
+   * edits of type {@link State#TRUNCATE_INVALID_TX} 
+   */
+  public Set<Long> getTruncateInvalidTx() {
+    return truncateInvalidTx;
+  }
+
+  void setTruncateInvalidTx(Set<Long> truncateInvalidTx) {
+    this.truncateInvalidTx = truncateInvalidTx;
+  }
+
+  /**
+   * Returns the time until which the invalid transactions need to be truncated from invalid transaction list.
+   * This is only populated for  edits of type {@link State#TRUNCATE_INVALID_TX}
+   */
+  public long getTruncateInvalidTxTime() {
+    return truncateInvalidTxTime;
+  }
+
+  void setTruncateInvalidTxTime(long truncateInvalidTxTime) {
+    this.truncateInvalidTxTime = truncateInvalidTxTime;
+  }
+
+  /**
+   * Returns the parent write pointer for a checkpoint operation.  This is only populated for edits of type
+   * {@link State#CHECKPOINT}
+   */
+  public long getParentWritePointer() {
+    return parentWritePointer;
+  }
+
+  void setParentWritePointer(long parentWritePointer) {
+    this.parentWritePointer = parentWritePointer;
+  }
+
+  /**
+   * Returns the checkpoint write pointers for the edit.  This is only populated for edits of type
+   * {@link State#ABORTED}.
+   */
+  public long[] getCheckpointPointers() {
+    return checkpointPointers;
+  }
+
+  void setCheckpointPointers(long[] checkpointPointers) {
+    this.checkpointPointers = checkpointPointers;
+  }
+
+  /**
+   * Creates a new instance in the {@link State#INPROGRESS} state.
+   */
+  public static TransactionEdit createStarted(long writePointer, long visibilityUpperBound,
+                                              long expirationDate, TransactionType type) {
+    return new TransactionEdit(writePointer, visibilityUpperBound, State.INPROGRESS,
+                               expirationDate, null, 0L, false, type, null, 0L, 0L, null);
+  }
+
+  /**
+   * Creates a new instance in the {@link State#COMMITTING} state.
+   */
+  public static TransactionEdit createCommitting(long writePointer, Set<ChangeId> changes) {
+    return new TransactionEdit(writePointer, 0L, State.COMMITTING, 0L, changes, 0L, false, null, null, 0L, 0L, null);
+  }
+
+  /**
+   * Creates a new instance in the {@link State#COMMITTED} state.
+   */
+  public static TransactionEdit createCommitted(long writePointer, Set<ChangeId> changes, long nextWritePointer,
+                                                boolean canCommit) {
+    return new TransactionEdit(writePointer, 0L, State.COMMITTED, 0L, changes, nextWritePointer, canCommit, null, 
+                               null, 0L, 0L, null);
+  }
+
+  /**
+   * Creates a new instance in the {@link State#ABORTED} state.
+   */
+  public static TransactionEdit createAborted(long writePointer, TransactionType type, long[] checkpointPointers) {
+    return new TransactionEdit(writePointer, 0L, State.ABORTED, 0L, null, 0L, false, type, null, 0L, 0L,
+        checkpointPointers);
+  }
+
+  /**
+   * Creates a new instance in the {@link State#INVALID} state.
+   */
+  public static TransactionEdit createInvalid(long writePointer) {
+    return new TransactionEdit(writePointer, 0L, State.INVALID, 0L, null, 0L, false, null, null, 0L, 0L, null);
+  }
+
+  /**
+   * Creates a new instance in the {@link State#MOVE_WATERMARK} state.
+   */
+  public static TransactionEdit createMoveWatermark(long writePointer) {
+    return new TransactionEdit(writePointer, 0L, State.MOVE_WATERMARK, 0L, null, 0L, false, null, null, 0L, 0L, null);
+  }
+
+  /**
+   * Creates a new instance in the {@link State#TRUNCATE_INVALID_TX} state.
+   */
+  public static TransactionEdit createTruncateInvalidTx(Set<Long> truncateInvalidTx) {
+    return new TransactionEdit(0L, 0L, State.TRUNCATE_INVALID_TX, 0L, null, 0L, false, null, truncateInvalidTx,
+        0L, 0L, null);
+  }
+
+  /**
+   * Creates a new instance in the {@link State#TRUNCATE_INVALID_TX} state.
+   */
+  public static TransactionEdit createTruncateInvalidTxBefore(long truncateInvalidTxTime) {
+    return new TransactionEdit(0L, 0L, State.TRUNCATE_INVALID_TX, 0L, null, 0L, false, null, null, 
+                               truncateInvalidTxTime, 0L, null);
+  }
+
+  /**
+   * Creates a new instance in the {@link State#CHECKPOINT} state.
+   */
+  public static TransactionEdit createCheckpoint(long writePointer, long parentWritePointer) {
+    return new TransactionEdit(writePointer, 0L, State.CHECKPOINT, 0L, null, 0L, false, null, null, 0L,
+        parentWritePointer, null);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    TransactionEditCodecs.encode(this, out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    TransactionEditCodecs.decode(this, in);
+  }
+
+  @Override
+  public final boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof TransactionEdit)) {
+      return false;
+    }
+
+    TransactionEdit that = (TransactionEdit) o;
+
+    return Objects.equal(this.writePointer, that.writePointer) &&
+        Objects.equal(this.visibilityUpperBound, that.visibilityUpperBound) &&
+        Objects.equal(this.commitPointer, that.commitPointer) &&
+        Objects.equal(this.expirationDate, that.expirationDate) &&
+        Objects.equal(this.state, that.state) &&
+        Objects.equal(this.changes, that.changes) &&
+        Objects.equal(this.canCommit, that.canCommit) &&
+        Objects.equal(this.type, that.type) &&
+        Objects.equal(this.truncateInvalidTx, that.truncateInvalidTx) &&
+        Objects.equal(this.truncateInvalidTxTime, that.truncateInvalidTxTime) &&
+        Objects.equal(this.parentWritePointer, that.parentWritePointer) &&
+        Arrays.equals(this.checkpointPointers, that.checkpointPointers);
+  }
+  
+  @Override
+  public final int hashCode() {
+    return Objects.hashCode(writePointer, visibilityUpperBound, commitPointer, expirationDate, state, changes,
+                            canCommit, type, parentWritePointer, checkpointPointers);
+  }
+
+  @Override
+  public String toString() {
+    return Objects.toStringHelper(this)
+      .add("writePointer", writePointer)
+      .add("visibilityUpperBound", visibilityUpperBound)
+      .add("commitPointer", commitPointer)
+      .add("expiration", expirationDate)
+      .add("state", state)
+      .add("changesSize", changes != null ? changes.size() : 0)
+      .add("canCommit", canCommit)
+      .add("type", type)
+      .add("truncateInvalidTx", truncateInvalidTx)
+      .add("truncateInvalidTxTime", truncateInvalidTxTime)
+      .add("parentWritePointer", parentWritePointer)
+      .add("checkpointPointers", checkpointPointers)
+      .toString();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/persist/TransactionEditCodecs.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/TransactionEditCodecs.java b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionEditCodecs.java
new file mode 100644
index 0000000..387ad41
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionEditCodecs.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.persist;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
+import org.apache.tephra.ChangeId;
+import org.apache.tephra.TransactionType;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+/**
+ * Utilities to handle encoding and decoding of {@link TransactionEdit} entries, while maintaining compatibility
+ * with older versions of the serialized data.
+ */
+public class TransactionEditCodecs {
+
+  private static final TransactionEditCodec[] ALL_CODECS = {
+      new TransactionEditCodecV1(),
+      new TransactionEditCodecV2(),
+      new TransactionEditCodecV3(),
+      new TransactionEditCodecV4()
+  };
+
+  private static final SortedMap<Byte, TransactionEditCodec> CODECS = new TreeMap<>();
+  static {
+    for (TransactionEditCodec codec : ALL_CODECS) {
+      CODECS.put(codec.getVersion(), codec);
+    }
+  }
+
+  /**
+   * Deserializes the encoded data from the given input stream, setting the values as fields
+   * on the given {@code TransactionEdit} instances.  This method expects first value in the
+   * {code DataInput} to be a byte representing the codec version used to serialize the instance.
+   *
+   * @param dest the transaction edit to populate with the deserialized values
+   * @param in the input stream containing the encoded data
+   * @throws IOException if an error occurs while deserializing from the input stream
+   */
+  public static void decode(TransactionEdit dest, DataInput in) throws IOException {
+    byte version = in.readByte();
+    TransactionEditCodec codec = CODECS.get(version);
+    if (codec == null) {
+      throw new IOException("TransactionEdit was serialized with an unknown codec version " + version +
+          ". Was it written with a newer version of Tephra?");
+    }
+    codec.decode(dest, in);
+  }
+
+  /**
+   * Serializes the given {@code TransactionEdit} instance with the latest available codec.
+   * This will first write out the version of the codec used to serialize the instance so that
+   * the correct codec can be used when calling {@link #decode(TransactionEdit, DataInput)}.
+   *
+   * @param src the transaction edit to serialize
+   * @param out the output stream to contain the data
+   * @throws IOException if an error occurs while serializing to the output stream
+   */
+  public static void encode(TransactionEdit src, DataOutput out) throws IOException {
+    TransactionEditCodec latestCodec = CODECS.get(CODECS.firstKey());
+    out.writeByte(latestCodec.getVersion());
+    latestCodec.encode(src, out);
+  }
+
+  /**
+   * Encodes the given transaction edit using a specific codec.  Note that this is only exposed
+   * for use by tests.
+   */
+  @VisibleForTesting
+  static void encode(TransactionEdit src, DataOutput out, TransactionEditCodec codec) throws IOException {
+    out.writeByte(codec.getVersion());
+    codec.encode(src, out);
+  }
+
+  /**
+   * Defines the interface used for encoding and decoding {@link TransactionEdit} instances to and from
+   * binary representations.
+   */
+  interface TransactionEditCodec {
+    /**
+     * Reads the encoded values from the data input stream and sets the fields in the given {@code TransactionEdit}
+     * instance.
+     *
+     * @param dest the instance on which to set all the deserialized values
+     * @param in the input stream containing the serialized data
+     * @throws IOException if an error occurs while deserializing the data
+     */
+    void decode(TransactionEdit dest, DataInput in) throws IOException;
+
+    /**
+     * Writes all the field values from the {@code TransactionEdit} instance in serialized form to the data
+     * output stream.
+     *
+     * @param src the instance to serialize to the stream
+     * @param out the output stream to contain the data
+     * @throws IOException if an error occurs while serializing the data
+     */
+    void encode(TransactionEdit src, DataOutput out) throws IOException;
+
+    /**
+     * Returns the version number for this codec.  Each codec should use a unique version number, with the newest
+     * codec having the lowest number.
+     */
+    byte getVersion();
+  }
+
+
+  // package-private for unit-test access
+  static class TransactionEditCodecV1 implements TransactionEditCodec {
+    @Override
+    public void decode(TransactionEdit dest, DataInput in) throws IOException {
+      dest.setWritePointer(in.readLong());
+      int stateIdx = in.readInt();
+      try {
+        dest.setState(TransactionEdit.State.values()[stateIdx]);
+      } catch (ArrayIndexOutOfBoundsException e) {
+        throw new IOException("State enum ordinal value is out of range: " + stateIdx);
+      }
+      dest.setExpiration(in.readLong());
+      dest.setCommitPointer(in.readLong());
+      dest.setCanCommit(in.readBoolean());
+      int changeSize = in.readInt();
+      Set<ChangeId> changes = Sets.newHashSet();
+      for (int i = 0; i < changeSize; i++) {
+        int currentLength = in.readInt();
+        byte[] currentBytes = new byte[currentLength];
+        in.readFully(currentBytes);
+        changes.add(new ChangeId(currentBytes));
+      }
+      dest.setChanges(changes);
+      // 1st version did not store this info. It is safe to set firstInProgress to 0, it may decrease performance until
+      // this tx is finished, but correctness will be preserved.
+      dest.setVisibilityUpperBound(0);
+    }
+
+    /** @deprecated use {@link TransactionEditCodecs.TransactionEditCodecV4} instead, it is still here for
+     *  unit-tests only */
+    @Override
+    @Deprecated
+    public void encode(TransactionEdit src, DataOutput out) throws IOException {
+      out.writeLong(src.getWritePointer());
+      // use ordinal for predictable size, though this does not support evolution
+      out.writeInt(src.getState().ordinal());
+      out.writeLong(src.getExpiration());
+      out.writeLong(src.getCommitPointer());
+      out.writeBoolean(src.getCanCommit());
+      Set<ChangeId> changes = src.getChanges();
+      if (changes == null) {
+        out.writeInt(0);
+      } else {
+        out.writeInt(changes.size());
+        for (ChangeId c : changes) {
+          byte[] cKey = c.getKey();
+          out.writeInt(cKey.length);
+          out.write(cKey);
+        }
+      }
+      // NOTE: we didn't have visibilityUpperBound in V1, it was added in V2
+      // we didn't have transaction type, truncateInvalidTx and truncateInvalidTxTime in V1 and V2,
+      // it was added in V3
+    }
+
+    @Override
+    public byte getVersion() {
+      return -1;
+    }
+  }
+
+  // package-private for unit-test access
+  static class TransactionEditCodecV2 extends TransactionEditCodecV1 implements TransactionEditCodec {
+    @Override
+    public void decode(TransactionEdit dest, DataInput in) throws IOException {
+      super.decode(dest, in);
+      dest.setVisibilityUpperBound(in.readLong());
+    }
+
+    /** @deprecated use {@link TransactionEditCodecs.TransactionEditCodecV4} instead, it is still here for
+     *  unit-tests only */
+    @Override
+    public void encode(TransactionEdit src, DataOutput out) throws IOException {
+      super.encode(src, out);
+      out.writeLong(src.getVisibilityUpperBound());
+      // NOTE: we didn't have transaction type, truncateInvalidTx and truncateInvalidTxTime in V1 and V2,
+      // it was added in V3
+    }
+
+    @Override
+    public byte getVersion() {
+      return -2;
+    }
+  }
+
+  // TODO: refactor to avoid duplicate code among different version of codecs
+  // package-private for unit-test access
+  static class TransactionEditCodecV3 extends TransactionEditCodecV2 implements TransactionEditCodec {
+    @Override
+    public void decode(TransactionEdit dest, DataInput in) throws IOException {
+      super.decode(dest, in);
+      int typeIdx = in.readInt();
+      // null transaction type is represented as -1
+      if (typeIdx < 0) {
+        dest.setType(null);
+      } else {
+        try {
+          dest.setType(TransactionType.values()[typeIdx]);
+        } catch (ArrayIndexOutOfBoundsException e) {
+          throw new IOException("Type enum ordinal value is out of range: " + typeIdx);
+        }
+      }
+
+      int truncateInvalidTxSize = in.readInt();
+      Set<Long> truncateInvalidTx = emptySet(dest.getTruncateInvalidTx());
+      for (int i = 0; i < truncateInvalidTxSize; i++) {
+        truncateInvalidTx.add(in.readLong());
+      }
+      dest.setTruncateInvalidTx(truncateInvalidTx);
+      dest.setTruncateInvalidTxTime(in.readLong());
+    }
+
+    private <T> Set<T> emptySet(Set<T> set) {
+      if (set == null) {
+        return Sets.newHashSet();
+      }
+      set.clear();
+      return set;
+    }
+
+    @Override
+    public void encode(TransactionEdit src, DataOutput out) throws IOException {
+      super.encode(src, out);
+      // null transaction type is represented as -1
+      if (src.getType() == null) {
+        out.writeInt(-1);
+      } else {
+        out.writeInt(src.getType().ordinal());
+      }
+
+      Set<Long> truncateInvalidTx = src.getTruncateInvalidTx();
+      if (truncateInvalidTx == null) {
+        out.writeInt(0);
+      } else {
+        out.writeInt(truncateInvalidTx.size());
+        for (long id : truncateInvalidTx) {
+          out.writeLong(id);
+        }
+      }
+      out.writeLong(src.getTruncateInvalidTxTime());
+    }
+
+    @Override
+    public byte getVersion() {
+      return -3;
+    }
+  }
+
+  static class TransactionEditCodecV4 extends TransactionEditCodecV3 {
+    @Override
+    public void decode(TransactionEdit dest, DataInput in) throws IOException {
+      super.decode(dest, in);
+      dest.setParentWritePointer(in.readLong());
+      int checkpointPointersLen = in.readInt();
+      if (checkpointPointersLen >= 0) {
+        long[] checkpointPointers = new long[checkpointPointersLen];
+        for (int i = 0; i < checkpointPointersLen; i++) {
+          checkpointPointers[i] = in.readLong();
+        }
+        dest.setCheckpointPointers(checkpointPointers);
+      }
+    }
+
+    @Override
+    public void encode(TransactionEdit src, DataOutput out) throws IOException {
+      super.encode(src, out);
+      out.writeLong(src.getParentWritePointer());
+      long[] checkpointPointers = src.getCheckpointPointers();
+      if (checkpointPointers == null) {
+        out.writeInt(-1);
+      } else {
+        out.writeInt(checkpointPointers.length);
+        for (int i = 0; i < checkpointPointers.length; i++) {
+          out.writeLong(checkpointPointers[i]);
+        }
+      }
+    }
+
+    @Override
+    public byte getVersion() {
+      return -4;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/persist/TransactionLog.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/TransactionLog.java b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionLog.java
new file mode 100644
index 0000000..e453523
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionLog.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.persist;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Represents a log of transaction state changes.
+ */
+public interface TransactionLog {
+
+  String getName();
+
+  long getTimestamp();
+
+  void append(TransactionEdit edit) throws IOException;
+
+  void append(List<TransactionEdit> edits) throws IOException;
+
+  void close() throws IOException;
+
+  TransactionLogReader getReader() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/persist/TransactionLogReader.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/TransactionLogReader.java b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionLogReader.java
new file mode 100644
index 0000000..51fda0b
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionLogReader.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.persist;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Represents a reader for {@link TransactionLog} instances.
+ */
+public interface TransactionLogReader extends Closeable {
+  /**
+   * Returns the next {@code TransactionEdit} from the log file, based on the current position, or {@code null}
+   * if the end of the file has been reached.
+   */
+  TransactionEdit next() throws IOException;
+
+  /**
+   * Populates {@code reuse} with the next {@code TransactionEdit}, based on the reader's current position in the
+   * log file.
+   * @param reuse The {@code TransactionEdit} instance to populate with the log entry data.
+   * @return The {@code TransactionEdit} instance, or {@code null} if the end of the file has been reached.
+   * @throws IOException If an error is encountered reading the log data.
+   */
+  TransactionEdit next(TransactionEdit reuse) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/persist/TransactionLogWriter.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/TransactionLogWriter.java b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionLogWriter.java
new file mode 100644
index 0000000..14893ac
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionLogWriter.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.persist;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Common interface for transaction log writers used by classes extending {@link AbstractTransactionLog}.
+ */
+public interface TransactionLogWriter extends Closeable {
+  /**
+   * Adds a new transaction entry to the log.  Note that this does not guarantee that the entry has been flushed
+   * to persistent storage until {@link #sync()} has been called.
+   *
+   * @param entry The transaction edit to append.
+   * @throws IOException If an error occurs while writing the edit to storage.
+   */
+  void append(AbstractTransactionLog.Entry entry) throws IOException;
+
+  /**
+   * Makes an entry of number of transaction entries that will follow in that log in a single sync.
+   *
+   * @param count Number of transaction entries.
+   * @throws IOException If an error occurs while writing the count to storage.
+   */
+  void commitMarker(int count) throws IOException;
+
+  /**
+   * Syncs any pending transaction edits added through {@link #append(AbstractTransactionLog.Entry)},
+   * but not yet flushed to durable storage.
+   *
+   * @throws IOException If an error occurs while flushing the outstanding edits.
+   */
+  void sync() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/persist/TransactionSnapshot.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/TransactionSnapshot.java b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionSnapshot.java
new file mode 100644
index 0000000..ccf7374
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionSnapshot.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.persist;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.tephra.ChangeId;
+import org.apache.tephra.TransactionManager;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.TreeMap;
+
+/**
+ * Represents an in-memory snapshot of the full transaction state.
+ */
+public class TransactionSnapshot implements TransactionVisibilityState {
+  private long timestamp;
+  private long readPointer;
+  private long writePointer;
+  private Collection<Long> invalid;
+  private NavigableMap<Long, TransactionManager.InProgressTx> inProgress;
+  private Map<Long, Set<ChangeId>> committingChangeSets;
+  private Map<Long, Set<ChangeId>> committedChangeSets;
+
+  public TransactionSnapshot(long timestamp, long readPointer, long writePointer, Collection<Long> invalid,
+                             NavigableMap<Long, TransactionManager.InProgressTx> inProgress,
+                             Map<Long, Set<ChangeId>> committing, Map<Long, Set<ChangeId>> committed) {
+    this(timestamp, readPointer, writePointer, invalid, inProgress);
+    this.committingChangeSets = committing;
+    this.committedChangeSets = committed;
+  }
+
+  public TransactionSnapshot(long timestamp, long readPointer, long writePointer, Collection<Long> invalid,
+                             NavigableMap<Long, TransactionManager.InProgressTx> inProgress) {
+    this.timestamp = timestamp;
+    this.readPointer = readPointer;
+    this.writePointer = writePointer;
+    this.invalid = invalid;
+    this.inProgress = inProgress;
+    this.committingChangeSets = Collections.emptyMap();
+    this.committedChangeSets = Collections.emptyMap();
+  }
+
+  @Override
+  public long getTimestamp() {
+    return timestamp;
+  }
+
+  @Override
+  public long getReadPointer() {
+    return readPointer;
+  }
+
+  @Override
+  public long getWritePointer() {
+    return writePointer;
+  }
+
+  @Override
+  public Collection<Long> getInvalid() {
+    return invalid;
+  }
+
+  @Override
+  public NavigableMap<Long, TransactionManager.InProgressTx> getInProgress() {
+    return inProgress;
+  }
+
+  @Override
+  public long getVisibilityUpperBound() {
+    // the readPointer of the oldest in-progress tx is the oldest in use
+    // todo: potential problem with not moving visibility upper bound for the whole duration of long-running tx
+    Map.Entry<Long, TransactionManager.InProgressTx> firstInProgress = inProgress.firstEntry();
+    if (firstInProgress == null) {
+      // using readPointer as smallest visible when non txs are there
+      return readPointer;
+    }
+    return firstInProgress.getValue().getVisibilityUpperBound();
+  }
+
+  /**
+   * Returns a map of transaction write pointer to sets of changed row keys for transactions that had called
+   * {@code InMemoryTransactionManager.canCommit(Transaction, Collection)} but not yet called
+   * {@code InMemoryTransactionManager.commit(Transaction)} at the time of the snapshot.
+   *
+   * @return a map of transaction write pointer to set of changed row keys.
+   */
+  public Map<Long, Set<ChangeId>> getCommittingChangeSets() {
+    return committingChangeSets;
+  }
+
+  /**
+   * Returns a map of transaction write pointer to set of changed row keys for transaction that had successfully called
+   * {@code InMemoryTransactionManager.commit(Transaction)} at the time of the snapshot.
+   *
+   * @return a map of transaction write pointer to set of changed row keys.
+   */
+  public Map<Long, Set<ChangeId>> getCommittedChangeSets() {
+    return committedChangeSets;
+  }
+
+  /**
+   * Checks that this instance matches another {@code TransactionSnapshot} instance.  Note that the equality check
+   * ignores the snapshot timestamp value, but includes all other properties.
+   *
+   * @param obj the other instance to check for equality.
+   * @return {@code true} if the instances are equal, {@code false} if not.
+   */
+  @Override
+  public boolean equals(Object obj) {
+    if (!(obj instanceof  TransactionSnapshot)) {
+      return false;
+    }
+    TransactionSnapshot other = (TransactionSnapshot) obj;
+    return readPointer == other.readPointer &&
+      writePointer == other.writePointer &&
+      invalid.equals(other.invalid) &&
+      inProgress.equals(other.inProgress) &&
+      committingChangeSets.equals(other.committingChangeSets) &&
+      committedChangeSets.equals(other.committedChangeSets);
+  }
+
+  @Override
+  public String toString() {
+    return Objects.toStringHelper(this)
+        .add("timestamp", timestamp)
+        .add("readPointer", readPointer)
+        .add("writePointer", writePointer)
+        .add("invalidSize", invalid.size())
+        .add("inProgressSize", inProgress.size())
+        .add("committingSize", committingChangeSets.size())
+        .add("committedSize", committedChangeSets.size())
+        .toString();
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(readPointer, writePointer, invalid, inProgress, committingChangeSets, committedChangeSets);
+  }
+
+  /**
+   * Creates a new {@code TransactionSnapshot} instance with copies of all of the individual collections.
+   * @param readPointer current transaction read pointer
+   * @param writePointer current transaction write pointer
+   * @param invalid current list of invalid write pointers
+   * @param inProgress current map of in-progress write pointers to expiration timestamps
+   * @param committing current map of write pointers to change sets which have passed {@code canCommit()} but not
+   *                   yet committed
+   * @param committed current map of write pointers to change sets which have committed
+   * @return a new {@code TransactionSnapshot} instance
+   */
+  public static TransactionSnapshot copyFrom(long snapshotTime, long readPointer,
+                                             long writePointer, Collection<Long> invalid,
+                                             NavigableMap<Long, TransactionManager.InProgressTx> inProgress,
+                                             Map<Long, Set<ChangeId>> committing,
+                                             NavigableMap<Long, Set<ChangeId>> committed) {
+    // copy invalid IDs
+    Collection<Long> invalidCopy = Lists.newArrayList(invalid);
+    // copy in-progress IDs and expirations
+    NavigableMap<Long, TransactionManager.InProgressTx> inProgressCopy = Maps.newTreeMap(inProgress);
+
+    // for committing and committed maps, we need to copy each individual Set as well to prevent modification
+    Map<Long, Set<ChangeId>> committingCopy = Maps.newHashMap();
+    for (Map.Entry<Long, Set<ChangeId>> entry : committing.entrySet()) {
+      committingCopy.put(entry.getKey(), new HashSet<>(entry.getValue()));
+    }
+
+    NavigableMap<Long, Set<ChangeId>> committedCopy = new TreeMap<>();
+    for (Map.Entry<Long, Set<ChangeId>> entry : committed.entrySet()) {
+      committedCopy.put(entry.getKey(), new HashSet<>(entry.getValue()));
+    }
+
+    return new TransactionSnapshot(snapshotTime, readPointer, writePointer,
+                                   invalidCopy, inProgressCopy, committingCopy, committedCopy);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/persist/TransactionStateStorage.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/TransactionStateStorage.java b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionStateStorage.java
new file mode 100644
index 0000000..f7edc8b
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionStateStorage.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.persist;
+
+import com.google.common.util.concurrent.Service;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+
+/**
+ * Defines the common contract for persisting transaction state changes.
+ */
+public interface TransactionStateStorage extends Service {
+
+  /**
+   * Persists a snapshot of transaction state to an output stream.
+   */
+  public void writeSnapshot(OutputStream out, TransactionSnapshot snapshot) throws IOException;
+
+  /**
+   * Persists a snapshot of transaction state.
+   */
+  public void writeSnapshot(TransactionSnapshot snapshot) throws IOException;
+
+  /**
+   * Returns the most recent snapshot that has been successfully written.  Note that this may return {@code null}
+   * if no completed snapshot files are found.
+   */
+  public TransactionSnapshot getLatestSnapshot() throws IOException;
+
+  /**
+   * Returns the most recent transaction visibility state that has been successfully written.
+   * Note that this may return {@code null} if no completed snapshot files are found.
+   * @return {@link TransactionVisibilityState}
+   */
+  public TransactionVisibilityState getLatestTransactionVisibilityState() throws IOException;
+
+  /**
+   * Removes any snapshots prior to the {@code numberToKeep} most recent.
+   *
+   * @param numberToKeep The number of most recent snapshots to keep.
+   * @throws IOException If an error occurs while deleting old snapshots.
+   * @return The timestamp of the oldest snapshot kept.
+   */
+  public long deleteOldSnapshots(int numberToKeep) throws IOException;
+
+  /**
+   * Returns the (non-qualified) names of available snapshots.
+   */
+  public List<String> listSnapshots() throws IOException;
+
+  /**
+   * Returns all {@link TransactionLog}s with a timestamp greater than or equal to the given timestamp.  Note that
+   * the returned list is guaranteed to be sorted in ascending timestamp order.
+   */
+  public List<TransactionLog> getLogsSince(long timestamp) throws IOException;
+
+  /**
+   * Creates a new {@link TransactionLog}.
+   */
+  public TransactionLog createLog(long timestamp) throws IOException;
+
+  /**
+   * Returns the (non-qualified) names of available logs.
+   */
+  public List<String> listLogs() throws IOException;
+
+  /**
+   * Removes any transaction logs with a timestamp older than the given value.  Logs must be removed based on timestamp
+   * to ensure we can fully recover state based on a given snapshot.
+   * @param timestamp The timestamp to delete up to.  Logs with a timestamp less than this value will be removed.
+   * @throws IOException If an error occurs while removing logs.
+   */
+  public void deleteLogsOlderThan(long timestamp) throws IOException;
+
+  /**
+   * Create the directories required for the transaction state stage.
+   * @throws IOException If an error occurred during the creation of required directories for transaction state storage.
+   */
+  public void setupStorage() throws IOException;
+
+  /**
+   * Returns a string representation of the location used for persistence.
+   */
+  public String getLocation();
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/persist/TransactionVisibilityState.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/TransactionVisibilityState.java b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionVisibilityState.java
new file mode 100644
index 0000000..cf845af
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionVisibilityState.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.persist;
+
+import org.apache.tephra.TransactionManager;
+
+import java.util.Collection;
+import java.util.NavigableMap;
+
+/**
+ * Transaction Visibility state contains information required by TransactionProcessor CoProcessor
+ * to determine cell visibility.
+ */
+public interface TransactionVisibilityState {
+
+  /**
+   * Returns the timestamp from when this snapshot was created.
+   */
+  long getTimestamp();
+
+  /**
+   * Returns the read pointer at the time of the snapshot.
+   */
+  long getReadPointer();
+
+  /**
+   * Returns the next write pointer at the time of the snapshot.
+   */
+  long getWritePointer();
+
+  /**
+   * Returns the list of invalid write pointers at the time of the snapshot.
+   */
+  Collection<Long> getInvalid();
+
+  /**
+   * Returns the map of write pointers to in-progress transactions at the time of the snapshot.
+   */
+  NavigableMap<Long, TransactionManager.InProgressTx> getInProgress();
+
+  /**
+   * @return transaction id {@code X} such that any of the transactions newer than {@code X} might be invisible to
+   *         some of the currently in-progress transactions or to those that will be started <p>
+   *         NOTE: the returned tx id can be invalid.
+   */
+  long getVisibilityUpperBound();
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/persist/package-info.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/package-info.java b/tephra-core/src/main/java/org/apache/tephra/persist/package-info.java
new file mode 100644
index 0000000..01decb0
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains interfaces and implementations for persisting transaction state.
+ */
+package org.apache.tephra.persist;

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/rpc/RPCServiceHandler.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/rpc/RPCServiceHandler.java b/tephra-core/src/main/java/org/apache/tephra/rpc/RPCServiceHandler.java
new file mode 100644
index 0000000..242a6fe
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/rpc/RPCServiceHandler.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tephra.rpc;
+
+/**
+ * Defines lifecycle interface for all rpc handlers.
+ */
+public interface RPCServiceHandler {
+
+  void init() throws Exception;
+
+  void destroy() throws Exception;
+}