You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by po...@apache.org on 2016/05/11 20:15:43 UTC

[43/56] [abbrv] [partial] incubator-tephra git commit: Rename package to org.apache.tephra

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/persist/HDFSUtil.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/HDFSUtil.java b/tephra-core/src/main/java/co/cask/tephra/persist/HDFSUtil.java
deleted file mode 100644
index a462566..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/persist/HDFSUtil.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package co.cask.tephra.persist;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.lang.reflect.Method;
-
-/**
- * Utility for handling HDFS file lease recovery.  This is a copy-n-paste fork of
- * {@link org.apache.hadoop.hbase.util.FSHDFSUtils} from the latest HBase 0.94 version (as of 0.94.12),
- * which contains some additional fixes not present in our current HBase dependency version --
- * mainly checking the return value of the {@code DistributedFileSystem.recoverLease()} call to verify that
- * recovery succeeded.
- */
-public class HDFSUtil {
-  private static final Logger LOG = LoggerFactory.getLogger(HDFSUtil.class);
-  /**
-   * Recover the lease from HDFS, retrying multiple times.
-   */
-  public void recoverFileLease(final FileSystem fs, final Path p,
-                               Configuration conf)
-    throws IOException {
-    // lease recovery not needed for local file system case.
-    if (!(fs instanceof DistributedFileSystem)) {
-      return;
-    }
-    recoverDFSFileLease((DistributedFileSystem) fs, p, conf);
-  }
-
-  /*
-   * Run the dfs recover lease. recoverLease is asynchronous. It returns:
-   *    -false when it starts the lease recovery (i.e. lease recovery not *yet* done)
-   *    - true when the lease recovery has succeeded or the file is closed.
-   * But, we have to be careful.  Each time we call recoverLease, it starts the recover lease
-   * process over from the beginning.  We could put ourselves in a situation where we are
-   * doing nothing but starting a recovery, interrupting it to start again, and so on.
-   * The findings over in HBASE-8354 have it that the namenode will try to recover the lease
-   * on the file's primary node.  If all is well, it should return near immediately.  But,
-   * as is common, it is the very primary node that has crashed and so the namenode will be
-   * stuck waiting on a socket timeout before it will ask another datanode to start the
-   * recovery. It does not help if we call recoverLease in the meantime and in particular,
-   * subsequent to the socket timeout, a recoverLease invocation will cause us to start
-   * over from square one (possibly waiting on socket timeout against primary node).  So,
-   * in the below, we do the following:
-   * 1. Call recoverLease.
-   * 2. If it returns true, break.
-   * 3. If it returns false, wait a few seconds and then call it again.
-   * 4. If it returns true, break.
-   * 5. If it returns false, wait for what we think the datanode socket timeout is
-   * (configurable) and then try again.
-   * 6. If it returns true, break.
-   * 7. If it returns false, repeat starting at step 5. above.
-   *
-   * If HDFS-4525 is available, call it every second and we might be able to exit early.
-   */
-  boolean recoverDFSFileLease(final DistributedFileSystem dfs, final Path p,
-                              final Configuration conf)
-    throws IOException {
-    LOG.info("Recovering lease on dfs file " + p);
-    long startWaiting = System.currentTimeMillis();
-    // Default is 15 minutes. It's huge, but the idea is that if we have a major issue, HDFS
-    // usually needs 10 minutes before marking the nodes as dead. So we're putting ourselves
-    // beyond that limit 'to be safe'.
-    long recoveryTimeout = conf.getInt("hbase.lease.recovery.timeout", 900000) + startWaiting;
-    // This setting should be what the cluster dfs heartbeat is set to.
-    long firstPause = conf.getInt("hbase.lease.recovery.first.pause", 3000);
-    // This should be set to how long it'll take for us to timeout against primary datanode if it
-    // is dead.  We set it to 61 seconds, 1 second than the default READ_TIMEOUT in HDFS, the
-    // default value for DFS_CLIENT_SOCKET_TIMEOUT_KEY.
-    long subsequentPause = conf.getInt("hbase.lease.recovery.dfs.timeout", 61 * 1000);
-
-    Method isFileClosedMeth = null;
-    // whether we need to look for isFileClosed method
-    boolean findIsFileClosedMeth = true;
-    boolean recovered = false;
-    // We break the loop if we succeed the lease recovery, timeout, or we throw an exception.
-    for (int nbAttempt = 0; !recovered; nbAttempt++) {
-      recovered = recoverLease(dfs, nbAttempt, p, startWaiting);
-      if (recovered || checkIfTimedout(conf, recoveryTimeout, nbAttempt, p, startWaiting)) {
-        break;
-      }
-      try {
-        // On the first time through wait the short 'firstPause'.
-        if (nbAttempt == 0) {
-          Thread.sleep(firstPause);
-        } else {
-          // Cycle here until subsequentPause elapses.  While spinning, check isFileClosed if
-          // available (should be in hadoop 2.0.5... not in hadoop 1 though.
-          long localStartWaiting = System.currentTimeMillis();
-          while ((System.currentTimeMillis() - localStartWaiting) <
-            subsequentPause) {
-            Thread.sleep(conf.getInt("hbase.lease.recovery.pause", 1000));
-            if (findIsFileClosedMeth) {
-              try {
-                isFileClosedMeth = dfs.getClass().getMethod("isFileClosed",
-                                                            new Class[]{ Path.class });
-              } catch (NoSuchMethodException nsme) {
-                LOG.debug("isFileClosed not available");
-              } finally {
-                findIsFileClosedMeth = false;
-              }
-            }
-            if (isFileClosedMeth != null && isFileClosed(dfs, isFileClosedMeth, p)) {
-              recovered = true;
-              break;
-            }
-          }
-        }
-      } catch (InterruptedException ie) {
-        InterruptedIOException iioe = new InterruptedIOException();
-        iioe.initCause(ie);
-        throw iioe;
-      }
-    }
-    return recovered;
-  }
-
-  boolean checkIfTimedout(final Configuration conf, final long recoveryTimeout,
-                          final int nbAttempt, final Path p, final long startWaiting) {
-    if (recoveryTimeout < System.currentTimeMillis()) {
-      LOG.warn("Cannot recoverLease after trying for " +
-                 conf.getInt("hbase.lease.recovery.timeout", 900000) +
-                 "ms (hbase.lease.recovery.timeout); continuing, but may be DATALOSS!!!; " +
-                 getLogMessageDetail(nbAttempt, p, startWaiting));
-      return true;
-    }
-    return false;
-  }
-
-  /**
-   * Try to recover the lease.
-   * @param dfs The filesystem instance.
-   * @param nbAttempt Count number of this attempt.
-   * @param p Path of the file to recover.
-   * @param startWaiting Timestamp of when we started attempting to recover the file lease.
-   * @return True if dfs#recoverLease came by true.
-   * @throws java.io.FileNotFoundException
-   */
-  boolean recoverLease(final DistributedFileSystem dfs, final int nbAttempt, final Path p,
-                       final long startWaiting)
-    throws FileNotFoundException {
-    boolean recovered = false;
-    try {
-      recovered = dfs.recoverLease(p);
-      LOG.info("recoverLease=" + recovered + ", " +
-                 getLogMessageDetail(nbAttempt, p, startWaiting));
-    } catch (IOException e) {
-      if (e instanceof LeaseExpiredException && e.getMessage().contains("File does not exist")) {
-        // This exception comes out instead of FNFE, fix it
-        throw new FileNotFoundException("The given file wasn't found at " + p);
-      } else if (e instanceof FileNotFoundException) {
-        throw (FileNotFoundException) e;
-      }
-      LOG.warn(getLogMessageDetail(nbAttempt, p, startWaiting), e);
-    }
-    return recovered;
-  }
-
-  /**
-   * @param nbAttempt Attempt number for the lease recovery.
-   * @param p Path of the file to recover.
-   * @param startWaiting Timestamp of when we started attempting to recover the file lease.
-   * @return Detail to append to any log message around lease recovering.
-   */
-  private String getLogMessageDetail(final int nbAttempt, final Path p, final long startWaiting) {
-    return "attempt=" + nbAttempt + " on file=" + p + " after " +
-      (System.currentTimeMillis() - startWaiting) + "ms";
-  }
-
-  /**
-   * Call HDFS-4525 isFileClosed if it is available.
-   * @param dfs Filesystem instance to use.
-   * @param m Method instance to call.
-   * @param p Path of the file to check is closed.
-   * @return True if file is closed.
-   */
-  private boolean isFileClosed(final DistributedFileSystem dfs, final Method m, final Path p) {
-    try {
-      return (Boolean) m.invoke(dfs, p);
-    } catch (SecurityException e) {
-      LOG.warn("No access", e);
-    } catch (Exception e) {
-      LOG.warn("Failed invocation for " + p.toString(), e);
-    }
-    return false;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/persist/LocalFileTransactionLog.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/LocalFileTransactionLog.java b/tephra-core/src/main/java/co/cask/tephra/persist/LocalFileTransactionLog.java
deleted file mode 100644
index 590aaa5..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/persist/LocalFileTransactionLog.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.persist;
-
-import co.cask.tephra.metrics.MetricsCollector;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-
-/**
- * Reads and writes transaction logs against files in the local filesystem.
- */
-public class LocalFileTransactionLog extends AbstractTransactionLog {
-  private final File logFile;
-
-  /**
-   * Creates a new transaction log using the given file instance.
-   * @param logFile The log file to use.
-   */
-  public LocalFileTransactionLog(File logFile, long timestamp, MetricsCollector metricsCollector) {
-    super(timestamp, metricsCollector);
-    this.logFile = logFile;
-  }
-
-  @Override
-  public String getName() {
-    return logFile.getAbsolutePath();
-  }
-
-  @Override
-  protected TransactionLogWriter createWriter() throws IOException {
-    return new LogWriter(logFile);
-  }
-
-  @Override
-  public TransactionLogReader getReader() throws IOException {
-    return new LogReader(logFile);
-  }
-
-  private static final class LogWriter implements TransactionLogWriter {
-    private final FileOutputStream fos;
-    private final DataOutputStream out;
-
-    public LogWriter(File logFile) throws IOException {
-      this.fos = new FileOutputStream(logFile);
-      this.out = new DataOutputStream(new BufferedOutputStream(fos, LocalFileTransactionStateStorage.BUFFER_SIZE));
-    }
-
-    @Override
-    public void append(Entry entry) throws IOException {
-      entry.write(out);
-    }
-
-    @Override
-    public void commitMarker(int count) throws IOException {
-      // skip for local file
-    }
-
-    @Override
-    public void sync() throws IOException {
-      out.flush();
-    }
-
-    @Override
-    public void close() throws IOException {
-      out.flush();
-      out.close();
-      fos.close();
-    }
-  }
-
-  private static final class LogReader implements TransactionLogReader {
-    private final FileInputStream fin;
-    private final DataInputStream in;
-    private Entry reuseEntry = new Entry();
-
-    public LogReader(File logFile) throws IOException {
-      this.fin = new FileInputStream(logFile);
-      this.in = new DataInputStream(new BufferedInputStream(fin, LocalFileTransactionStateStorage.BUFFER_SIZE));
-    }
-
-    @Override
-    public TransactionEdit next() throws IOException {
-      Entry entry = new Entry();
-      try {
-        entry.readFields(in);
-      } catch (EOFException eofe) {
-        // signal end of file by returning null
-        return null;
-      }
-      return entry.getEdit();
-    }
-
-    @Override
-    public TransactionEdit next(TransactionEdit reuse) throws IOException {
-      try {
-        reuseEntry.getKey().readFields(in);
-        reuse.readFields(in);
-      } catch (EOFException eofe) {
-        // signal end of file by returning null
-        return null;
-      }
-      return reuse;
-    }
-
-    @Override
-    public void close() throws IOException {
-      in.close();
-      fin.close();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/persist/LocalFileTransactionStateStorage.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/LocalFileTransactionStateStorage.java b/tephra-core/src/main/java/co/cask/tephra/persist/LocalFileTransactionStateStorage.java
deleted file mode 100644
index 4c3539d..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/persist/LocalFileTransactionStateStorage.java
+++ /dev/null
@@ -1,346 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.persist;
-
-import co.cask.tephra.TxConstants;
-import co.cask.tephra.metrics.MetricsCollector;
-import co.cask.tephra.snapshot.SnapshotCodecProvider;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.io.Closeables;
-import com.google.common.io.Files;
-import com.google.common.primitives.Longs;
-import com.google.inject.Inject;
-import org.apache.hadoop.conf.Configuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import javax.annotation.Nullable;
-
-/**
- * Persists transaction snapshots and write-ahead logs to files on the local filesystem.
- */
-public class LocalFileTransactionStateStorage extends AbstractTransactionStateStorage {
-  private static final String TMP_SNAPSHOT_FILE_PREFIX = ".in-progress.";
-  private static final String SNAPSHOT_FILE_PREFIX = "snapshot.";
-  private static final String LOG_FILE_PREFIX = "txlog.";
-  private static final Logger LOG = LoggerFactory.getLogger(LocalFileTransactionStateStorage.class);
-  static final int BUFFER_SIZE = 16384;
-
-  private static final FilenameFilter SNAPSHOT_FILE_FILTER = new FilenameFilter() {
-    @Override
-    public boolean accept(File file, String s) {
-      return s.startsWith(SNAPSHOT_FILE_PREFIX);
-    }
-  };
-
-  private final String configuredSnapshotDir;
-  private final MetricsCollector metricsCollector;
-  private File snapshotDir;
-
-  @Inject
-  public LocalFileTransactionStateStorage(Configuration conf, SnapshotCodecProvider codecProvider,
-                                          MetricsCollector metricsCollector) {
-    super(codecProvider);
-    this.configuredSnapshotDir = conf.get(TxConstants.Manager.CFG_TX_SNAPSHOT_LOCAL_DIR);
-    this.metricsCollector = metricsCollector;
-  }
-
-  @Override
-  protected void startUp() throws Exception {
-    Preconditions.checkState(configuredSnapshotDir != null,
-        "Snapshot directory is not configured.  Please set " + TxConstants.Manager.CFG_TX_SNAPSHOT_LOCAL_DIR +
-        " in configuration.");
-    snapshotDir = new File(configuredSnapshotDir);
-  }
-
-  @Override
-  protected void shutDown() throws Exception {
-    // nothing to do
-  }
-
-  @Override
-  public String getLocation() {
-    return snapshotDir.getAbsolutePath();
-  }
-
-  @Override
-  public void writeSnapshot(TransactionSnapshot snapshot) throws IOException {
-    // save the snapshot to a temporary file
-    File snapshotTmpFile = new File(snapshotDir, TMP_SNAPSHOT_FILE_PREFIX + snapshot.getTimestamp());
-    LOG.debug("Writing snapshot to temporary file {}", snapshotTmpFile);
-    OutputStream out = Files.newOutputStreamSupplier(snapshotTmpFile).getOutput();
-    boolean threw = true;
-    try {
-      codecProvider.encode(out, snapshot);
-      threw = false;
-    } finally {
-      Closeables.close(out, threw);
-    }
-
-    // move the temporary file into place with the correct filename
-    File finalFile = new File(snapshotDir, SNAPSHOT_FILE_PREFIX + snapshot.getTimestamp());
-    if (!snapshotTmpFile.renameTo(finalFile)) {
-      throw new IOException("Failed renaming temporary snapshot file " + snapshotTmpFile.getName() + " to " +
-          finalFile.getName());
-    }
-
-    LOG.debug("Completed snapshot to file {}", finalFile);
-  }
-
-  @Override
-  public TransactionSnapshot getLatestSnapshot() throws IOException {
-    InputStream is = getLatestSnapshotInputStream();
-    if (is == null) {
-      return null;
-    }
-    try {
-      return readSnapshotFile(is);
-    } finally {
-      is.close();
-    }
-  }
-
-  @Override
-  public TransactionVisibilityState getLatestTransactionVisibilityState() throws IOException {
-    InputStream is = getLatestSnapshotInputStream();
-    if (is == null) {
-      return null;
-    }
-    try {
-      return codecProvider.decodeTransactionVisibilityState(is);
-    } finally {
-      is.close();
-    }
-  }
-
-  private InputStream getLatestSnapshotInputStream() throws IOException {
-    File[] snapshotFiles = snapshotDir.listFiles(SNAPSHOT_FILE_FILTER);
-    TimestampedFilename mostRecent = null;
-    for (File file : snapshotFiles) {
-      TimestampedFilename tsFile = new TimestampedFilename(file);
-      if (mostRecent == null || tsFile.compareTo(mostRecent) > 0) {
-        mostRecent = tsFile;
-      }
-    }
-
-    if (mostRecent == null) {
-      LOG.info("No snapshot files found in {}", snapshotDir.getAbsolutePath());
-      return null;
-    }
-
-    return new FileInputStream(mostRecent.getFile());
-  }
-
-  private TransactionSnapshot readSnapshotFile(InputStream is) throws IOException {
-    return codecProvider.decode(is);
-  }
-
-  @Override
-  public long deleteOldSnapshots(int numberToKeep) throws IOException {
-    File[] snapshotFiles = snapshotDir.listFiles(SNAPSHOT_FILE_FILTER);
-    if (snapshotFiles.length == 0) {
-      return -1;
-    }
-    TimestampedFilename[] snapshotFilenames = new TimestampedFilename[snapshotFiles.length];
-    for (int i = 0; i < snapshotFiles.length; i++) {
-      snapshotFilenames[i] = new TimestampedFilename(snapshotFiles[i]);
-    }
-    Arrays.sort(snapshotFilenames, Collections.reverseOrder());
-    if (snapshotFilenames.length <= numberToKeep) {
-      // nothing to delete, just return the oldest timestamp
-      return snapshotFilenames[snapshotFilenames.length - 1].getTimestamp();
-    }
-    int toRemoveCount = snapshotFilenames.length - numberToKeep;
-    TimestampedFilename[] toRemove = new TimestampedFilename[toRemoveCount];
-    System.arraycopy(snapshotFilenames, numberToKeep, toRemove, 0, toRemoveCount);
-    int removedCnt = 0;
-    for (int i = 0; i < toRemove.length; i++) {
-      File currentFile = toRemove[i].getFile();
-      LOG.debug("Removing old snapshot file {}", currentFile.getAbsolutePath());
-      if (!toRemove[i].getFile().delete()) {
-        LOG.error("Failed deleting snapshot file {}", currentFile.getAbsolutePath());
-      } else {
-        removedCnt++;
-      }
-    }
-    long oldestTimestamp = snapshotFilenames[numberToKeep - 1].getTimestamp();
-    LOG.info("Removed {} out of {} expected snapshot files older than {}", removedCnt, toRemoveCount, oldestTimestamp);
-    return oldestTimestamp;
-  }
-
-  @Override
-  public List<String> listSnapshots() throws IOException {
-    File[] snapshots = snapshotDir.listFiles(SNAPSHOT_FILE_FILTER);
-    return Lists.transform(Arrays.asList(snapshots), new Function<File, String>() {
-      @Nullable
-      @Override
-      public String apply(@Nullable File input) {
-        return input.getName();
-      }
-    });
-  }
-
-  @Override
-  public List<TransactionLog> getLogsSince(long timestamp) throws IOException {
-    File[] logFiles = snapshotDir.listFiles(new LogFileFilter(timestamp, Long.MAX_VALUE));
-    TimestampedFilename[] timestampedFiles = new TimestampedFilename[logFiles.length];
-    for (int i = 0; i < logFiles.length; i++) {
-      timestampedFiles[i] = new TimestampedFilename(logFiles[i]);
-    }
-    // logs need to be processed in ascending order
-    Arrays.sort(timestampedFiles);
-    return Lists.transform(Arrays.asList(timestampedFiles), new Function<TimestampedFilename, TransactionLog>() {
-      @Nullable
-      @Override
-      public TransactionLog apply(@Nullable TimestampedFilename input) {
-        return new LocalFileTransactionLog(input.getFile(), input.getTimestamp(), metricsCollector);
-      }
-    });
-  }
-
-  @Override
-  public TransactionLog createLog(long timestamp) throws IOException {
-    File newLogFile = new File(snapshotDir, LOG_FILE_PREFIX + timestamp);
-    LOG.info("Creating new transaction log at {}", newLogFile.getAbsolutePath());
-    return new LocalFileTransactionLog(newLogFile, timestamp, metricsCollector);
-  }
-
-  @Override
-  public void deleteLogsOlderThan(long timestamp) throws IOException {
-    File[] logFiles = snapshotDir.listFiles(new LogFileFilter(0, timestamp));
-    int removedCnt = 0;
-    for (File file : logFiles) {
-      LOG.debug("Removing old transaction log {}", file.getPath());
-      if (file.delete()) {
-        removedCnt++;
-      } else {
-        LOG.warn("Failed to remove log file {}", file.getAbsolutePath());
-      }
-    }
-    LOG.debug("Removed {} transaction logs older than {}", removedCnt, timestamp);
-  }
-
-  @Override
-  public void setupStorage() throws IOException {
-    // create the directory if it doesn't exist
-    if (!snapshotDir.exists()) {
-      if (!snapshotDir.mkdirs()) {
-        throw new IOException("Failed to create directory " + configuredSnapshotDir +
-                                " for transaction snapshot storage");
-      }
-    } else {
-      Preconditions.checkState(snapshotDir.isDirectory(),
-                               "Configured snapshot directory " + configuredSnapshotDir + " is not a directory!");
-      Preconditions.checkState(snapshotDir.canWrite(), "Configured snapshot directory " +
-        configuredSnapshotDir + " exists but is not writable!");
-    }
-  }
-
-  @Override
-  public List<String> listLogs() throws IOException {
-    File[] logs = snapshotDir.listFiles(new LogFileFilter(0, Long.MAX_VALUE));
-    return Lists.transform(Arrays.asList(logs), new Function<File, String>() {
-      @Nullable
-      @Override
-      public String apply(@Nullable File input) {
-        return input.getName();
-      }
-    });
-  }
-
-  private static class LogFileFilter implements FilenameFilter {
-    private final long startTime;
-    private final long endTime;
-
-    public LogFileFilter(long startTime, long endTime) {
-      this.startTime = startTime;
-      this.endTime = endTime;
-    }
-
-    @Override
-    public boolean accept(File file, String s) {
-      if (s.startsWith(LOG_FILE_PREFIX)) {
-        String[] parts = s.split("\\.");
-        if (parts.length == 2) {
-          try {
-            long fileTime = Long.parseLong(parts[1]);
-            return fileTime >= startTime && fileTime < endTime;
-          } catch (NumberFormatException ignored) {
-            LOG.warn("Filename {} did not match the expected pattern prefix.<timestamp>", s);
-          }
-        }
-      }
-      return false;
-    }
-  }
-
-  /**
-   * Represents a filename composed of a prefix and a ".timestamp" suffix.  This is useful for manipulating both
-   * snapshot and transaction log filenames.
-   */
-  private static class TimestampedFilename implements Comparable<TimestampedFilename> {
-    private File file;
-    private String prefix;
-    private long timestamp;
-
-    public TimestampedFilename(File file) {
-      this.file = file;
-      String[] parts = file.getName().split("\\.");
-      if (parts.length != 2) {
-        throw new IllegalArgumentException("Filename " + file.getName() +
-                                           " did not match the expected pattern prefix.timestamp");
-      }
-      prefix = parts[0];
-      timestamp = Long.parseLong(parts[1]);
-    }
-
-    public File getFile() {
-      return file;
-    }
-
-    public String getPrefix() {
-      return prefix;
-    }
-
-    public long getTimestamp() {
-      return timestamp;
-    }
-
-    @Override
-    public int compareTo(TimestampedFilename other) {
-      int res = prefix.compareTo(other.getPrefix());
-      if (res == 0) {
-        res = Longs.compare(timestamp, other.getTimestamp());
-      }
-      return res;
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/persist/NoOpTransactionStateStorage.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/NoOpTransactionStateStorage.java b/tephra-core/src/main/java/co/cask/tephra/persist/NoOpTransactionStateStorage.java
deleted file mode 100644
index 3b6d0b7..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/persist/NoOpTransactionStateStorage.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.persist;
-
-import co.cask.tephra.snapshot.SnapshotCodec;
-import co.cask.tephra.snapshot.SnapshotCodecProvider;
-import com.google.common.util.concurrent.AbstractIdleService;
-import com.google.inject.Inject;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Minimal {@link TransactionStateStorage} implementation that does nothing, i.e. does not maintain any actual state.
- */
-public class NoOpTransactionStateStorage extends AbstractIdleService implements TransactionStateStorage {
-
-  private final SnapshotCodec codec;
-
-  @Inject
-  public NoOpTransactionStateStorage(SnapshotCodecProvider codecProvider) {
-    codec = codecProvider;
-  }
-
-  @Override
-  protected void startUp() throws Exception {
-  }
-
-  @Override
-  protected void shutDown() throws Exception {
-  }
-
-  @Override
-  public void writeSnapshot(OutputStream out, TransactionSnapshot snapshot) throws IOException {
-    codec.encode(out, snapshot);
-  }
-
-  @Override
-  public void writeSnapshot(TransactionSnapshot snapshot) throws IOException {
-  }
-
-  @Override
-  public TransactionSnapshot getLatestSnapshot() throws IOException {
-    return null;
-  }
-
-  @Override
-  public TransactionVisibilityState getLatestTransactionVisibilityState() throws IOException {
-    return null;
-  }
-
-  @Override
-  public long deleteOldSnapshots(int numberToKeep) throws IOException {
-    return 0;
-  }
-
-  @Override
-  public List<String> listSnapshots() throws IOException {
-    return new ArrayList<>(0);
-  }
-
-  @Override
-  public List<TransactionLog> getLogsSince(long timestamp) throws IOException {
-    return new ArrayList<>(0);
-  }
-
-  @Override
-  public TransactionLog createLog(long timestamp) throws IOException {
-    return new NoOpTransactionLog();
-  }
-
-  @Override
-  public void deleteLogsOlderThan(long timestamp) throws IOException {
-  }
-
-  @Override
-  public void setupStorage() throws IOException {
-  }
-
-  @Override
-  public List<String> listLogs() throws IOException {
-    return new ArrayList<>(0);
-  }
-
-  @Override
-  public String getLocation() {
-    return "no-op";
-  }
-
-  private static class NoOpTransactionLog implements TransactionLog {
-    private long timestamp = System.currentTimeMillis();
-
-    @Override
-    public String getName() {
-      return "no-op";
-    }
-
-    @Override
-    public long getTimestamp() {
-      return timestamp;
-    }
-
-    @Override
-    public void append(TransactionEdit edit) throws IOException {
-    }
-
-    @Override
-    public void append(List<TransactionEdit> edits) throws IOException {
-    }
-
-    @Override
-    public void close() {
-    }
-
-    @Override
-    public TransactionLogReader getReader() {
-      return new TransactionLogReader() {
-        @Override
-        public TransactionEdit next() {
-          return null;
-        }
-
-        @Override
-        public TransactionEdit next(TransactionEdit reuse) {
-          return null;
-        }
-
-        @Override
-        public void close() {
-        }
-      };
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/persist/TransactionEdit.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/TransactionEdit.java b/tephra-core/src/main/java/co/cask/tephra/persist/TransactionEdit.java
deleted file mode 100644
index 405bbfd..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/persist/TransactionEdit.java
+++ /dev/null
@@ -1,360 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.persist;
-
-import co.cask.tephra.ChangeId;
-import co.cask.tephra.TransactionManager;
-import co.cask.tephra.TransactionType;
-import com.google.common.base.Objects;
-import com.google.common.collect.Sets;
-import org.apache.hadoop.io.Writable;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Set;
-
-/**
- * Represents a transaction state change in the {@link TransactionLog}.
- */
-public class TransactionEdit implements Writable {
-
-  /**
-   * The possible state changes for a transaction.
-   */
-  public enum State {
-    INPROGRESS, COMMITTING, COMMITTED, INVALID, ABORTED, MOVE_WATERMARK, TRUNCATE_INVALID_TX, CHECKPOINT
-  }
-
-  private long writePointer;
-
-  /**
-   * stores the value of visibility upper bound
-   * (see {@link TransactionManager.InProgressTx#getVisibilityUpperBound()})
-   * for edit of {@link State#INPROGRESS} only
-   */
-  private long visibilityUpperBound;
-  private long commitPointer;
-  private long expirationDate;
-  private State state;
-  private Set<ChangeId> changes;
-  /** Whether or not the COMMITTED change should be fully committed. */
-  private boolean canCommit;
-  private TransactionType type;
-  private Set<Long> truncateInvalidTx;
-  private long truncateInvalidTxTime;
-  private long parentWritePointer;
-  private long[] checkpointPointers;
-
-  // for Writable
-  public TransactionEdit() {
-    this.changes = Sets.newHashSet();
-    this.truncateInvalidTx = Sets.newHashSet();
-  }
-
-  // package private for testing
-  TransactionEdit(long writePointer, long visibilityUpperBound, State state, long expirationDate,
-                  Set<ChangeId> changes, long commitPointer, boolean canCommit, TransactionType type,
-                  Set<Long> truncateInvalidTx, long truncateInvalidTxTime, long parentWritePointer,
-                  long[] checkpointPointers) {
-    this.writePointer = writePointer;
-    this.visibilityUpperBound = visibilityUpperBound;
-    this.state = state;
-    this.expirationDate = expirationDate;
-    this.changes = changes != null ? changes : Collections.<ChangeId>emptySet();
-    this.commitPointer = commitPointer;
-    this.canCommit = canCommit;
-    this.type = type;
-    this.truncateInvalidTx = truncateInvalidTx != null ? truncateInvalidTx : Collections.<Long>emptySet();
-    this.truncateInvalidTxTime = truncateInvalidTxTime;
-    this.parentWritePointer = parentWritePointer;
-    this.checkpointPointers = checkpointPointers;
-  }
-
-  /**
-   * Returns the transaction write pointer assigned for the state change.
-   */
-  public long getWritePointer() {
-    return writePointer;
-  }
-
-  void setWritePointer(long writePointer) {
-    this.writePointer = writePointer;
-  }
-
-  public long getVisibilityUpperBound() {
-    return visibilityUpperBound;
-  }
-
-  void setVisibilityUpperBound(long visibilityUpperBound) {
-    this.visibilityUpperBound = visibilityUpperBound;
-  }
-
-  /**
-   * Returns the type of state change represented.
-   */
-  public State getState() {
-    return state;
-  }
-
-  void setState(State state) {
-    this.state = state;
-  }
-
-  /**
-   * Returns any expiration timestamp (in milliseconds) associated with the state change.  This should only
-   * be populated for changes of type {@link State#INPROGRESS}.
-   */
-  public long getExpiration() {
-    return expirationDate;
-  }
-
-  void setExpiration(long expirationDate) {
-    this.expirationDate = expirationDate;
-  }
-
-  /**
-   * @return the set of changed row keys associated with the state change.  This is only populated for edits
-   * of type {@link State#COMMITTING} or {@link State#COMMITTED}.
-   */
-  public Set<ChangeId> getChanges() {
-    return changes;
-  }
-
-  void setChanges(Set<ChangeId> changes) {
-    this.changes = changes;
-  }
-
-  /**
-   * Returns the write pointer used to commit the row key change set.  This is only populated for edits of type
-   * {@link State#COMMITTED}.
-   */
-  public long getCommitPointer() {
-    return commitPointer;
-  }
-
-  void setCommitPointer(long commitPointer) {
-    this.commitPointer = commitPointer;
-  }
-
-  /**
-   * Returns whether or not the transaction should be moved to the committed set.  This is only populated for edits
-   * of type {@link State#COMMITTED}.
-   */
-  public boolean getCanCommit() {
-    return canCommit;
-  }
-
-  void setCanCommit(boolean canCommit) {
-    this.canCommit = canCommit;
-  }
-
-  /**
-   * Returns the transaction type. This is only populated for edits of type {@link State#INPROGRESS} or 
-   * {@link State#ABORTED}.
-   */
-  public TransactionType getType() {
-    return type;
-  }
-
-  void setType(TransactionType type) {
-    this.type = type;
-  }
-
-  /**
-   * Returns the transaction ids to be removed from invalid transaction list. This is only populated for
-   * edits of type {@link State#TRUNCATE_INVALID_TX} 
-   */
-  public Set<Long> getTruncateInvalidTx() {
-    return truncateInvalidTx;
-  }
-
-  void setTruncateInvalidTx(Set<Long> truncateInvalidTx) {
-    this.truncateInvalidTx = truncateInvalidTx;
-  }
-
-  /**
-   * Returns the time until which the invalid transactions need to be truncated from invalid transaction list.
-   * This is only populated for  edits of type {@link State#TRUNCATE_INVALID_TX}
-   */
-  public long getTruncateInvalidTxTime() {
-    return truncateInvalidTxTime;
-  }
-
-  void setTruncateInvalidTxTime(long truncateInvalidTxTime) {
-    this.truncateInvalidTxTime = truncateInvalidTxTime;
-  }
-
-  /**
-   * Returns the parent write pointer for a checkpoint operation.  This is only populated for edits of type
-   * {@link State#CHECKPOINT}
-   */
-  public long getParentWritePointer() {
-    return parentWritePointer;
-  }
-
-  void setParentWritePointer(long parentWritePointer) {
-    this.parentWritePointer = parentWritePointer;
-  }
-
-  /**
-   * Returns the checkpoint write pointers for the edit.  This is only populated for edits of type
-   * {@link State#ABORTED}.
-   */
-  public long[] getCheckpointPointers() {
-    return checkpointPointers;
-  }
-
-  void setCheckpointPointers(long[] checkpointPointers) {
-    this.checkpointPointers = checkpointPointers;
-  }
-
-  /**
-   * Creates a new instance in the {@link State#INPROGRESS} state.
-   */
-  public static TransactionEdit createStarted(long writePointer, long visibilityUpperBound,
-                                              long expirationDate, TransactionType type) {
-    return new TransactionEdit(writePointer, visibilityUpperBound, State.INPROGRESS,
-                               expirationDate, null, 0L, false, type, null, 0L, 0L, null);
-  }
-
-  /**
-   * Creates a new instance in the {@link State#COMMITTING} state.
-   */
-  public static TransactionEdit createCommitting(long writePointer, Set<ChangeId> changes) {
-    return new TransactionEdit(writePointer, 0L, State.COMMITTING, 0L, changes, 0L, false, null, null, 0L, 0L, null);
-  }
-
-  /**
-   * Creates a new instance in the {@link State#COMMITTED} state.
-   */
-  public static TransactionEdit createCommitted(long writePointer, Set<ChangeId> changes, long nextWritePointer,
-                                                boolean canCommit) {
-    return new TransactionEdit(writePointer, 0L, State.COMMITTED, 0L, changes, nextWritePointer, canCommit, null, 
-                               null, 0L, 0L, null);
-  }
-
-  /**
-   * Creates a new instance in the {@link State#ABORTED} state.
-   */
-  public static TransactionEdit createAborted(long writePointer, TransactionType type, long[] checkpointPointers) {
-    return new TransactionEdit(writePointer, 0L, State.ABORTED, 0L, null, 0L, false, type, null, 0L, 0L,
-        checkpointPointers);
-  }
-
-  /**
-   * Creates a new instance in the {@link State#INVALID} state.
-   */
-  public static TransactionEdit createInvalid(long writePointer) {
-    return new TransactionEdit(writePointer, 0L, State.INVALID, 0L, null, 0L, false, null, null, 0L, 0L, null);
-  }
-
-  /**
-   * Creates a new instance in the {@link State#MOVE_WATERMARK} state.
-   */
-  public static TransactionEdit createMoveWatermark(long writePointer) {
-    return new TransactionEdit(writePointer, 0L, State.MOVE_WATERMARK, 0L, null, 0L, false, null, null, 0L, 0L, null);
-  }
-
-  /**
-   * Creates a new instance in the {@link State#TRUNCATE_INVALID_TX} state.
-   */
-  public static TransactionEdit createTruncateInvalidTx(Set<Long> truncateInvalidTx) {
-    return new TransactionEdit(0L, 0L, State.TRUNCATE_INVALID_TX, 0L, null, 0L, false, null, truncateInvalidTx,
-        0L, 0L, null);
-  }
-
-  /**
-   * Creates a new instance in the {@link State#TRUNCATE_INVALID_TX} state.
-   */
-  public static TransactionEdit createTruncateInvalidTxBefore(long truncateInvalidTxTime) {
-    return new TransactionEdit(0L, 0L, State.TRUNCATE_INVALID_TX, 0L, null, 0L, false, null, null, 
-                               truncateInvalidTxTime, 0L, null);
-  }
-
-  /**
-   * Creates a new instance in the {@link State#CHECKPOINT} state.
-   */
-  public static TransactionEdit createCheckpoint(long writePointer, long parentWritePointer) {
-    return new TransactionEdit(writePointer, 0L, State.CHECKPOINT, 0L, null, 0L, false, null, null, 0L,
-        parentWritePointer, null);
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    TransactionEditCodecs.encode(this, out);
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    TransactionEditCodecs.decode(this, in);
-  }
-
-  @Override
-  public final boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (!(o instanceof TransactionEdit)) {
-      return false;
-    }
-
-    TransactionEdit that = (TransactionEdit) o;
-
-    return Objects.equal(this.writePointer, that.writePointer) &&
-        Objects.equal(this.visibilityUpperBound, that.visibilityUpperBound) &&
-        Objects.equal(this.commitPointer, that.commitPointer) &&
-        Objects.equal(this.expirationDate, that.expirationDate) &&
-        Objects.equal(this.state, that.state) &&
-        Objects.equal(this.changes, that.changes) &&
-        Objects.equal(this.canCommit, that.canCommit) &&
-        Objects.equal(this.type, that.type) &&
-        Objects.equal(this.truncateInvalidTx, that.truncateInvalidTx) &&
-        Objects.equal(this.truncateInvalidTxTime, that.truncateInvalidTxTime) &&
-        Objects.equal(this.parentWritePointer, that.parentWritePointer) &&
-        Arrays.equals(this.checkpointPointers, that.checkpointPointers);
-  }
-  
-  @Override
-  public final int hashCode() {
-    return Objects.hashCode(writePointer, visibilityUpperBound, commitPointer, expirationDate, state, changes,
-                            canCommit, type, parentWritePointer, checkpointPointers);
-  }
-
-  @Override
-  public String toString() {
-    return Objects.toStringHelper(this)
-      .add("writePointer", writePointer)
-      .add("visibilityUpperBound", visibilityUpperBound)
-      .add("commitPointer", commitPointer)
-      .add("expiration", expirationDate)
-      .add("state", state)
-      .add("changesSize", changes != null ? changes.size() : 0)
-      .add("canCommit", canCommit)
-      .add("type", type)
-      .add("truncateInvalidTx", truncateInvalidTx)
-      .add("truncateInvalidTxTime", truncateInvalidTxTime)
-      .add("parentWritePointer", parentWritePointer)
-      .add("checkpointPointers", checkpointPointers)
-      .toString();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/persist/TransactionEditCodecs.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/TransactionEditCodecs.java b/tephra-core/src/main/java/co/cask/tephra/persist/TransactionEditCodecs.java
deleted file mode 100644
index e18b73f..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/persist/TransactionEditCodecs.java
+++ /dev/null
@@ -1,313 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.persist;
-
-import co.cask.tephra.ChangeId;
-import co.cask.tephra.TransactionType;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Sets;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
-
-/**
- * Utilities to handle encoding and decoding of {@link TransactionEdit} entries, while maintaining compatibility
- * with older versions of the serialized data.
- */
-public class TransactionEditCodecs {
-
-  private static final TransactionEditCodec[] ALL_CODECS = {
-      new TransactionEditCodecV1(),
-      new TransactionEditCodecV2(),
-      new TransactionEditCodecV3(),
-      new TransactionEditCodecV4()
-  };
-
-  private static final SortedMap<Byte, TransactionEditCodec> CODECS = new TreeMap<>();
-  static {
-    for (TransactionEditCodec codec : ALL_CODECS) {
-      CODECS.put(codec.getVersion(), codec);
-    }
-  }
-
-  /**
-   * Deserializes the encoded data from the given input stream, setting the values as fields
-   * on the given {@code TransactionEdit} instances.  This method expects first value in the
-   * {code DataInput} to be a byte representing the codec version used to serialize the instance.
-   *
-   * @param dest the transaction edit to populate with the deserialized values
-   * @param in the input stream containing the encoded data
-   * @throws IOException if an error occurs while deserializing from the input stream
-   */
-  public static void decode(TransactionEdit dest, DataInput in) throws IOException {
-    byte version = in.readByte();
-    TransactionEditCodec codec = CODECS.get(version);
-    if (codec == null) {
-      throw new IOException("TransactionEdit was serialized with an unknown codec version " + version +
-          ". Was it written with a newer version of Tephra?");
-    }
-    codec.decode(dest, in);
-  }
-
-  /**
-   * Serializes the given {@code TransactionEdit} instance with the latest available codec.
-   * This will first write out the version of the codec used to serialize the instance so that
-   * the correct codec can be used when calling {@link #decode(TransactionEdit, DataInput)}.
-   *
-   * @param src the transaction edit to serialize
-   * @param out the output stream to contain the data
-   * @throws IOException if an error occurs while serializing to the output stream
-   */
-  public static void encode(TransactionEdit src, DataOutput out) throws IOException {
-    TransactionEditCodec latestCodec = CODECS.get(CODECS.firstKey());
-    out.writeByte(latestCodec.getVersion());
-    latestCodec.encode(src, out);
-  }
-
-  /**
-   * Encodes the given transaction edit using a specific codec.  Note that this is only exposed
-   * for use by tests.
-   */
-  @VisibleForTesting
-  static void encode(TransactionEdit src, DataOutput out, TransactionEditCodec codec) throws IOException {
-    out.writeByte(codec.getVersion());
-    codec.encode(src, out);
-  }
-
-  /**
-   * Defines the interface used for encoding and decoding {@link TransactionEdit} instances to and from
-   * binary representations.
-   */
-  interface TransactionEditCodec {
-    /**
-     * Reads the encoded values from the data input stream and sets the fields in the given {@code TransactionEdit}
-     * instance.
-     *
-     * @param dest the instance on which to set all the deserialized values
-     * @param in the input stream containing the serialized data
-     * @throws IOException if an error occurs while deserializing the data
-     */
-    void decode(TransactionEdit dest, DataInput in) throws IOException;
-
-    /**
-     * Writes all the field values from the {@code TransactionEdit} instance in serialized form to the data
-     * output stream.
-     *
-     * @param src the instance to serialize to the stream
-     * @param out the output stream to contain the data
-     * @throws IOException if an error occurs while serializing the data
-     */
-    void encode(TransactionEdit src, DataOutput out) throws IOException;
-
-    /**
-     * Returns the version number for this codec.  Each codec should use a unique version number, with the newest
-     * codec having the lowest number.
-     */
-    byte getVersion();
-  }
-
-
-  // package-private for unit-test access
-  static class TransactionEditCodecV1 implements TransactionEditCodec {
-    @Override
-    public void decode(TransactionEdit dest, DataInput in) throws IOException {
-      dest.setWritePointer(in.readLong());
-      int stateIdx = in.readInt();
-      try {
-        dest.setState(TransactionEdit.State.values()[stateIdx]);
-      } catch (ArrayIndexOutOfBoundsException e) {
-        throw new IOException("State enum ordinal value is out of range: " + stateIdx);
-      }
-      dest.setExpiration(in.readLong());
-      dest.setCommitPointer(in.readLong());
-      dest.setCanCommit(in.readBoolean());
-      int changeSize = in.readInt();
-      Set<ChangeId> changes = Sets.newHashSet();
-      for (int i = 0; i < changeSize; i++) {
-        int currentLength = in.readInt();
-        byte[] currentBytes = new byte[currentLength];
-        in.readFully(currentBytes);
-        changes.add(new ChangeId(currentBytes));
-      }
-      dest.setChanges(changes);
-      // 1st version did not store this info. It is safe to set firstInProgress to 0, it may decrease performance until
-      // this tx is finished, but correctness will be preserved.
-      dest.setVisibilityUpperBound(0);
-    }
-
-    /** @deprecated use {@link TransactionEditCodecs.TransactionEditCodecV4} instead, it is still here for
-     *  unit-tests only */
-    @Override
-    @Deprecated
-    public void encode(TransactionEdit src, DataOutput out) throws IOException {
-      out.writeLong(src.getWritePointer());
-      // use ordinal for predictable size, though this does not support evolution
-      out.writeInt(src.getState().ordinal());
-      out.writeLong(src.getExpiration());
-      out.writeLong(src.getCommitPointer());
-      out.writeBoolean(src.getCanCommit());
-      Set<ChangeId> changes = src.getChanges();
-      if (changes == null) {
-        out.writeInt(0);
-      } else {
-        out.writeInt(changes.size());
-        for (ChangeId c : changes) {
-          byte[] cKey = c.getKey();
-          out.writeInt(cKey.length);
-          out.write(cKey);
-        }
-      }
-      // NOTE: we didn't have visibilityUpperBound in V1, it was added in V2
-      // we didn't have transaction type, truncateInvalidTx and truncateInvalidTxTime in V1 and V2,
-      // it was added in V3
-    }
-
-    @Override
-    public byte getVersion() {
-      return -1;
-    }
-  }
-
-  // package-private for unit-test access
-  static class TransactionEditCodecV2 extends TransactionEditCodecV1 implements TransactionEditCodec {
-    @Override
-    public void decode(TransactionEdit dest, DataInput in) throws IOException {
-      super.decode(dest, in);
-      dest.setVisibilityUpperBound(in.readLong());
-    }
-
-    /** @deprecated use {@link TransactionEditCodecs.TransactionEditCodecV4} instead, it is still here for
-     *  unit-tests only */
-    @Override
-    public void encode(TransactionEdit src, DataOutput out) throws IOException {
-      super.encode(src, out);
-      out.writeLong(src.getVisibilityUpperBound());
-      // NOTE: we didn't have transaction type, truncateInvalidTx and truncateInvalidTxTime in V1 and V2,
-      // it was added in V3
-    }
-
-    @Override
-    public byte getVersion() {
-      return -2;
-    }
-  }
-
-  // TODO: refactor to avoid duplicate code among different version of codecs
-  // package-private for unit-test access
-  static class TransactionEditCodecV3 extends TransactionEditCodecV2 implements TransactionEditCodec {
-    @Override
-    public void decode(TransactionEdit dest, DataInput in) throws IOException {
-      super.decode(dest, in);
-      int typeIdx = in.readInt();
-      // null transaction type is represented as -1
-      if (typeIdx < 0) {
-        dest.setType(null);
-      } else {
-        try {
-          dest.setType(TransactionType.values()[typeIdx]);
-        } catch (ArrayIndexOutOfBoundsException e) {
-          throw new IOException("Type enum ordinal value is out of range: " + typeIdx);
-        }
-      }
-
-      int truncateInvalidTxSize = in.readInt();
-      Set<Long> truncateInvalidTx = emptySet(dest.getTruncateInvalidTx());
-      for (int i = 0; i < truncateInvalidTxSize; i++) {
-        truncateInvalidTx.add(in.readLong());
-      }
-      dest.setTruncateInvalidTx(truncateInvalidTx);
-      dest.setTruncateInvalidTxTime(in.readLong());
-    }
-
-    private <T> Set<T> emptySet(Set<T> set) {
-      if (set == null) {
-        return Sets.newHashSet();
-      }
-      set.clear();
-      return set;
-    }
-
-    @Override
-    public void encode(TransactionEdit src, DataOutput out) throws IOException {
-      super.encode(src, out);
-      // null transaction type is represented as -1
-      if (src.getType() == null) {
-        out.writeInt(-1);
-      } else {
-        out.writeInt(src.getType().ordinal());
-      }
-
-      Set<Long> truncateInvalidTx = src.getTruncateInvalidTx();
-      if (truncateInvalidTx == null) {
-        out.writeInt(0);
-      } else {
-        out.writeInt(truncateInvalidTx.size());
-        for (long id : truncateInvalidTx) {
-          out.writeLong(id);
-        }
-      }
-      out.writeLong(src.getTruncateInvalidTxTime());
-    }
-
-    @Override
-    public byte getVersion() {
-      return -3;
-    }
-  }
-
-  static class TransactionEditCodecV4 extends TransactionEditCodecV3 {
-    @Override
-    public void decode(TransactionEdit dest, DataInput in) throws IOException {
-      super.decode(dest, in);
-      dest.setParentWritePointer(in.readLong());
-      int checkpointPointersLen = in.readInt();
-      if (checkpointPointersLen >= 0) {
-        long[] checkpointPointers = new long[checkpointPointersLen];
-        for (int i = 0; i < checkpointPointersLen; i++) {
-          checkpointPointers[i] = in.readLong();
-        }
-        dest.setCheckpointPointers(checkpointPointers);
-      }
-    }
-
-    @Override
-    public void encode(TransactionEdit src, DataOutput out) throws IOException {
-      super.encode(src, out);
-      out.writeLong(src.getParentWritePointer());
-      long[] checkpointPointers = src.getCheckpointPointers();
-      if (checkpointPointers == null) {
-        out.writeInt(-1);
-      } else {
-        out.writeInt(checkpointPointers.length);
-        for (int i = 0; i < checkpointPointers.length; i++) {
-          out.writeLong(checkpointPointers[i]);
-        }
-      }
-    }
-
-    @Override
-    public byte getVersion() {
-      return -4;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/persist/TransactionLog.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/TransactionLog.java b/tephra-core/src/main/java/co/cask/tephra/persist/TransactionLog.java
deleted file mode 100644
index f76a8f1..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/persist/TransactionLog.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.persist;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * Represents a log of transaction state changes.
- */
-public interface TransactionLog {
-
-  String getName();
-
-  long getTimestamp();
-
-  void append(TransactionEdit edit) throws IOException;
-
-  void append(List<TransactionEdit> edits) throws IOException;
-
-  void close() throws IOException;
-
-  TransactionLogReader getReader() throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/persist/TransactionLogReader.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/TransactionLogReader.java b/tephra-core/src/main/java/co/cask/tephra/persist/TransactionLogReader.java
deleted file mode 100644
index 3a3eaca..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/persist/TransactionLogReader.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.persist;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-/**
- * Represents a reader for {@link TransactionLog} instances.
- */
-public interface TransactionLogReader extends Closeable {
-  /**
-   * Returns the next {@code TransactionEdit} from the log file, based on the current position, or {@code null}
-   * if the end of the file has been reached.
-   */
-  TransactionEdit next() throws IOException;
-
-  /**
-   * Populates {@code reuse} with the next {@code TransactionEdit}, based on the reader's current position in the
-   * log file.
-   * @param reuse The {@code TransactionEdit} instance to populate with the log entry data.
-   * @return The {@code TransactionEdit} instance, or {@code null} if the end of the file has been reached.
-   * @throws IOException If an error is encountered reading the log data.
-   */
-  TransactionEdit next(TransactionEdit reuse) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/persist/TransactionLogWriter.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/TransactionLogWriter.java b/tephra-core/src/main/java/co/cask/tephra/persist/TransactionLogWriter.java
deleted file mode 100644
index 67d9aaf..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/persist/TransactionLogWriter.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.persist;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-/**
- * Common interface for transaction log writers used by classes extending {@link AbstractTransactionLog}.
- */
-public interface TransactionLogWriter extends Closeable {
-  /**
-   * Adds a new transaction entry to the log.  Note that this does not guarantee that the entry has been flushed
-   * to persistent storage until {@link #sync()} has been called.
-   *
-   * @param entry The transaction edit to append.
-   * @throws IOException If an error occurs while writing the edit to storage.
-   */
-  void append(AbstractTransactionLog.Entry entry) throws IOException;
-
-  /**
-   * Makes an entry of number of transaction entries that will follow in that log in a single sync.
-   *
-   * @param count Number of transaction entries.
-   * @throws IOException If an error occurs while writing the count to storage.
-   */
-  void commitMarker(int count) throws IOException;
-
-  /**
-   * Syncs any pending transaction edits added through {@link #append(AbstractTransactionLog.Entry)},
-   * but not yet flushed to durable storage.
-   *
-   * @throws IOException If an error occurs while flushing the outstanding edits.
-   */
-  void sync() throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/persist/TransactionSnapshot.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/TransactionSnapshot.java b/tephra-core/src/main/java/co/cask/tephra/persist/TransactionSnapshot.java
deleted file mode 100644
index 22bc449..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/persist/TransactionSnapshot.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.persist;
-
-import co.cask.tephra.ChangeId;
-import co.cask.tephra.TransactionManager;
-import com.google.common.base.Objects;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.Set;
-import java.util.TreeMap;
-
-/**
- * Represents an in-memory snapshot of the full transaction state.
- */
-public class TransactionSnapshot implements TransactionVisibilityState {
-  private long timestamp;
-  private long readPointer;
-  private long writePointer;
-  private Collection<Long> invalid;
-  private NavigableMap<Long, TransactionManager.InProgressTx> inProgress;
-  private Map<Long, Set<ChangeId>> committingChangeSets;
-  private Map<Long, Set<ChangeId>> committedChangeSets;
-
-  public TransactionSnapshot(long timestamp, long readPointer, long writePointer, Collection<Long> invalid,
-                             NavigableMap<Long, TransactionManager.InProgressTx> inProgress,
-                             Map<Long, Set<ChangeId>> committing, Map<Long, Set<ChangeId>> committed) {
-    this(timestamp, readPointer, writePointer, invalid, inProgress);
-    this.committingChangeSets = committing;
-    this.committedChangeSets = committed;
-  }
-
-  public TransactionSnapshot(long timestamp, long readPointer, long writePointer, Collection<Long> invalid,
-                             NavigableMap<Long, TransactionManager.InProgressTx> inProgress) {
-    this.timestamp = timestamp;
-    this.readPointer = readPointer;
-    this.writePointer = writePointer;
-    this.invalid = invalid;
-    this.inProgress = inProgress;
-    this.committingChangeSets = Collections.emptyMap();
-    this.committedChangeSets = Collections.emptyMap();
-  }
-
-  @Override
-  public long getTimestamp() {
-    return timestamp;
-  }
-
-  @Override
-  public long getReadPointer() {
-    return readPointer;
-  }
-
-  @Override
-  public long getWritePointer() {
-    return writePointer;
-  }
-
-  @Override
-  public Collection<Long> getInvalid() {
-    return invalid;
-  }
-
-  @Override
-  public NavigableMap<Long, TransactionManager.InProgressTx> getInProgress() {
-    return inProgress;
-  }
-
-  @Override
-  public long getVisibilityUpperBound() {
-    // the readPointer of the oldest in-progress tx is the oldest in use
-    // todo: potential problem with not moving visibility upper bound for the whole duration of long-running tx
-    Map.Entry<Long, TransactionManager.InProgressTx> firstInProgress = inProgress.firstEntry();
-    if (firstInProgress == null) {
-      // using readPointer as smallest visible when non txs are there
-      return readPointer;
-    }
-    return firstInProgress.getValue().getVisibilityUpperBound();
-  }
-
-  /**
-   * Returns a map of transaction write pointer to sets of changed row keys for transactions that had called
-   * {@code InMemoryTransactionManager.canCommit(Transaction, Collection)} but not yet called
-   * {@code InMemoryTransactionManager.commit(Transaction)} at the time of the snapshot.
-   *
-   * @return a map of transaction write pointer to set of changed row keys.
-   */
-  public Map<Long, Set<ChangeId>> getCommittingChangeSets() {
-    return committingChangeSets;
-  }
-
-  /**
-   * Returns a map of transaction write pointer to set of changed row keys for transaction that had successfully called
-   * {@code InMemoryTransactionManager.commit(Transaction)} at the time of the snapshot.
-   *
-   * @return a map of transaction write pointer to set of changed row keys.
-   */
-  public Map<Long, Set<ChangeId>> getCommittedChangeSets() {
-    return committedChangeSets;
-  }
-
-  /**
-   * Checks that this instance matches another {@code TransactionSnapshot} instance.  Note that the equality check
-   * ignores the snapshot timestamp value, but includes all other properties.
-   *
-   * @param obj the other instance to check for equality.
-   * @return {@code true} if the instances are equal, {@code false} if not.
-   */
-  @Override
-  public boolean equals(Object obj) {
-    if (!(obj instanceof  TransactionSnapshot)) {
-      return false;
-    }
-    TransactionSnapshot other = (TransactionSnapshot) obj;
-    return readPointer == other.readPointer &&
-      writePointer == other.writePointer &&
-      invalid.equals(other.invalid) &&
-      inProgress.equals(other.inProgress) &&
-      committingChangeSets.equals(other.committingChangeSets) &&
-      committedChangeSets.equals(other.committedChangeSets);
-  }
-
-  @Override
-  public String toString() {
-    return Objects.toStringHelper(this)
-        .add("timestamp", timestamp)
-        .add("readPointer", readPointer)
-        .add("writePointer", writePointer)
-        .add("invalidSize", invalid.size())
-        .add("inProgressSize", inProgress.size())
-        .add("committingSize", committingChangeSets.size())
-        .add("committedSize", committedChangeSets.size())
-        .toString();
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hashCode(readPointer, writePointer, invalid, inProgress, committingChangeSets, committedChangeSets);
-  }
-
-  /**
-   * Creates a new {@code TransactionSnapshot} instance with copies of all of the individual collections.
-   * @param readPointer current transaction read pointer
-   * @param writePointer current transaction write pointer
-   * @param invalid current list of invalid write pointers
-   * @param inProgress current map of in-progress write pointers to expiration timestamps
-   * @param committing current map of write pointers to change sets which have passed {@code canCommit()} but not
-   *                   yet committed
-   * @param committed current map of write pointers to change sets which have committed
-   * @return a new {@code TransactionSnapshot} instance
-   */
-  public static TransactionSnapshot copyFrom(long snapshotTime, long readPointer,
-                                             long writePointer, Collection<Long> invalid,
-                                             NavigableMap<Long, TransactionManager.InProgressTx> inProgress,
-                                             Map<Long, Set<ChangeId>> committing,
-                                             NavigableMap<Long, Set<ChangeId>> committed) {
-    // copy invalid IDs
-    Collection<Long> invalidCopy = Lists.newArrayList(invalid);
-    // copy in-progress IDs and expirations
-    NavigableMap<Long, TransactionManager.InProgressTx> inProgressCopy = Maps.newTreeMap(inProgress);
-
-    // for committing and committed maps, we need to copy each individual Set as well to prevent modification
-    Map<Long, Set<ChangeId>> committingCopy = Maps.newHashMap();
-    for (Map.Entry<Long, Set<ChangeId>> entry : committing.entrySet()) {
-      committingCopy.put(entry.getKey(), new HashSet<>(entry.getValue()));
-    }
-
-    NavigableMap<Long, Set<ChangeId>> committedCopy = new TreeMap<>();
-    for (Map.Entry<Long, Set<ChangeId>> entry : committed.entrySet()) {
-      committedCopy.put(entry.getKey(), new HashSet<>(entry.getValue()));
-    }
-
-    return new TransactionSnapshot(snapshotTime, readPointer, writePointer,
-                                   invalidCopy, inProgressCopy, committingCopy, committedCopy);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/persist/TransactionStateStorage.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/TransactionStateStorage.java b/tephra-core/src/main/java/co/cask/tephra/persist/TransactionStateStorage.java
deleted file mode 100644
index 0acb9bb..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/persist/TransactionStateStorage.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.persist;
-
-import com.google.common.util.concurrent.Service;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.List;
-
-/**
- * Defines the common contract for persisting transaction state changes.
- */
-public interface TransactionStateStorage extends Service {
-
-  /**
-   * Persists a snapshot of transaction state to an output stream.
-   */
-  public void writeSnapshot(OutputStream out, TransactionSnapshot snapshot) throws IOException;
-
-  /**
-   * Persists a snapshot of transaction state.
-   */
-  public void writeSnapshot(TransactionSnapshot snapshot) throws IOException;
-
-  /**
-   * Returns the most recent snapshot that has been successfully written.  Note that this may return {@code null}
-   * if no completed snapshot files are found.
-   */
-  public TransactionSnapshot getLatestSnapshot() throws IOException;
-
-  /**
-   * Returns the most recent transaction visibility state that has been successfully written.
-   * Note that this may return {@code null} if no completed snapshot files are found.
-   * @return {@link TransactionVisibilityState}
-   */
-  public TransactionVisibilityState getLatestTransactionVisibilityState() throws IOException;
-
-  /**
-   * Removes any snapshots prior to the {@code numberToKeep} most recent.
-   *
-   * @param numberToKeep The number of most recent snapshots to keep.
-   * @throws IOException If an error occurs while deleting old snapshots.
-   * @return The timestamp of the oldest snapshot kept.
-   */
-  public long deleteOldSnapshots(int numberToKeep) throws IOException;
-
-  /**
-   * Returns the (non-qualified) names of available snapshots.
-   */
-  public List<String> listSnapshots() throws IOException;
-
-  /**
-   * Returns all {@link TransactionLog}s with a timestamp greater than or equal to the given timestamp.  Note that
-   * the returned list is guaranteed to be sorted in ascending timestamp order.
-   */
-  public List<TransactionLog> getLogsSince(long timestamp) throws IOException;
-
-  /**
-   * Creates a new {@link TransactionLog}.
-   */
-  public TransactionLog createLog(long timestamp) throws IOException;
-
-  /**
-   * Returns the (non-qualified) names of available logs.
-   */
-  public List<String> listLogs() throws IOException;
-
-  /**
-   * Removes any transaction logs with a timestamp older than the given value.  Logs must be removed based on timestamp
-   * to ensure we can fully recover state based on a given snapshot.
-   * @param timestamp The timestamp to delete up to.  Logs with a timestamp less than this value will be removed.
-   * @throws IOException If an error occurs while removing logs.
-   */
-  public void deleteLogsOlderThan(long timestamp) throws IOException;
-
-  /**
-   * Create the directories required for the transaction state stage.
-   * @throws IOException If an error occurred during the creation of required directories for transaction state storage.
-   */
-  public void setupStorage() throws IOException;
-
-  /**
-   * Returns a string representation of the location used for persistence.
-   */
-  public String getLocation();
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/persist/TransactionVisibilityState.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/TransactionVisibilityState.java b/tephra-core/src/main/java/co/cask/tephra/persist/TransactionVisibilityState.java
deleted file mode 100644
index 68d17c3..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/persist/TransactionVisibilityState.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.persist;
-
-import co.cask.tephra.TransactionManager;
-
-import java.util.Collection;
-import java.util.NavigableMap;
-
-/**
- * Transaction Visibility state contains information required by TransactionProcessor CoProcessor
- * to determine cell visibility.
- */
-public interface TransactionVisibilityState {
-
-  /**
-   * Returns the timestamp from when this snapshot was created.
-   */
-  long getTimestamp();
-
-  /**
-   * Returns the read pointer at the time of the snapshot.
-   */
-  long getReadPointer();
-
-  /**
-   * Returns the next write pointer at the time of the snapshot.
-   */
-  long getWritePointer();
-
-  /**
-   * Returns the list of invalid write pointers at the time of the snapshot.
-   */
-  Collection<Long> getInvalid();
-
-  /**
-   * Returns the map of write pointers to in-progress transactions at the time of the snapshot.
-   */
-  NavigableMap<Long, TransactionManager.InProgressTx> getInProgress();
-
-  /**
-   * @return transaction id {@code X} such that any of the transactions newer than {@code X} might be invisible to
-   *         some of the currently in-progress transactions or to those that will be started <p>
-   *         NOTE: the returned tx id can be invalid.
-   */
-  long getVisibilityUpperBound();
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/persist/package-info.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/package-info.java b/tephra-core/src/main/java/co/cask/tephra/persist/package-info.java
deleted file mode 100644
index c94f0fe..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/persist/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * This package contains interfaces and implementations for persisting transaction state.
- */
-package co.cask.tephra.persist;

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/rpc/RPCServiceHandler.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/rpc/RPCServiceHandler.java b/tephra-core/src/main/java/co/cask/tephra/rpc/RPCServiceHandler.java
deleted file mode 100644
index 664bfe3..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/rpc/RPCServiceHandler.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package co.cask.tephra.rpc;
-
-/**
- * Defines lifecycle interface for all rpc handlers.
- */
-public interface RPCServiceHandler {
-
-  void init() throws Exception;
-
-  void destroy() throws Exception;
-}