You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by po...@apache.org on 2016/05/06 23:02:40 UTC

[34/51] [partial] incubator-tephra git commit: Rename package to org.apache.tephra

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/distributed/thrift/TTransactionType.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/thrift/TTransactionType.java b/tephra-core/src/main/java/org/apache/tephra/distributed/thrift/TTransactionType.java
new file mode 100644
index 0000000..8abfc3b
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/distributed/thrift/TTransactionType.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Autogenerated by Thrift Compiler (0.9.0)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ *  @generated
+ */
+package org.apache.tephra.distributed.thrift;
+
+
+public enum TTransactionType implements org.apache.thrift.TEnum {
+  SHORT(1),
+  LONG(2);
+
+  private final int value;
+
+  private TTransactionType(int value) {
+    this.value = value;
+  }
+
+  /**
+   * Get the integer value of this enum value, as defined in the Thrift IDL.
+   */
+  public int getValue() {
+    return value;
+  }
+
+  /**
+   * Find a the enum type by its integer value, as defined in the Thrift IDL.
+   * @return null if the value is not found.
+   */
+  public static TTransactionType findByValue(int value) { 
+    switch (value) {
+      case 1:
+        return SHORT;
+      case 2:
+        return LONG;
+      default:
+        return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/distributed/thrift/TVisibilityLevel.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/thrift/TVisibilityLevel.java b/tephra-core/src/main/java/org/apache/tephra/distributed/thrift/TVisibilityLevel.java
new file mode 100644
index 0000000..ed081b5
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/distributed/thrift/TVisibilityLevel.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Autogenerated by Thrift Compiler (0.9.0)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ *  @generated
+ */
+package org.apache.tephra.distributed.thrift;
+
+
+public enum TVisibilityLevel implements org.apache.thrift.TEnum {
+  SNAPSHOT(1),
+  SNAPSHOT_EXCLUDE_CURRENT(2),
+  SNAPSHOT_ALL(3);
+
+  private final int value;
+
+  private TVisibilityLevel(int value) {
+    this.value = value;
+  }
+
+  /**
+   * Get the integer value of this enum value, as defined in the Thrift IDL.
+   */
+  public int getValue() {
+    return value;
+  }
+
+  /**
+   * Find a the enum type by its integer value, as defined in the Thrift IDL.
+   * @return null if the value is not found.
+   */
+  public static TVisibilityLevel findByValue(int value) { 
+    switch (value) {
+      case 1:
+        return SNAPSHOT;
+      case 2:
+        return SNAPSHOT_EXCLUDE_CURRENT;
+      case 3:
+        return SNAPSHOT_ALL;
+      default:
+        return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/inmemory/DetachedTxSystemClient.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/inmemory/DetachedTxSystemClient.java b/tephra-core/src/main/java/org/apache/tephra/inmemory/DetachedTxSystemClient.java
new file mode 100644
index 0000000..c8bf22a
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/inmemory/DetachedTxSystemClient.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.inmemory;
+
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import org.apache.tephra.InvalidTruncateTimeException;
+import org.apache.tephra.Transaction;
+import org.apache.tephra.TransactionCouldNotTakeSnapshotException;
+import org.apache.tephra.TransactionSystemClient;
+import org.apache.tephra.TransactionType;
+import org.apache.tephra.TxConstants;
+
+import java.io.InputStream;
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Implementation of the tx system client that doesn't talk to any global service and tries to do its best to meet the
+ * tx system requirements/expectations. In fact it implements enough logic to support running flows (when each flowlet
+ * uses its own detached tx system client, without talking to each other and sharing any state) with "process exactly
+ * once" guarantee if no failures happen.
+ *
+ * NOTE: Will NOT detect conflicts. May leave inconsistent state when process crashes. Does NOT provide even read
+ *       isolation guarantees.
+ *
+ * Good for performance testing. For demoing high throughput. For use-cases with relaxed tx guarantees.
+ */
+public class DetachedTxSystemClient implements TransactionSystemClient {
+  // Dataset and queue logic relies on tx id to grow monotonically even after restart. Hence we need to start with
+  // value that is for sure bigger than the last one used before restart.
+  // NOTE: with code below we assume we don't do more than InMemoryTransactionManager.MAX_TX_PER_MS tx/ms
+  //       by single client
+  private AtomicLong generator = new AtomicLong(System.currentTimeMillis() * TxConstants.MAX_TX_PER_MS);
+
+  @Override
+  public Transaction startShort() {
+    long wp = getWritePointer();
+    // NOTE: -1 here is because we have logic that uses (readpointer + 1) as a "exclusive stop key" in some datasets
+    return new Transaction(
+      Long.MAX_VALUE - 1, wp, new long[0], new long[0],
+      Transaction.NO_TX_IN_PROGRESS, TransactionType.SHORT);
+  }
+
+  private long getWritePointer() {
+    long wp = generator.incrementAndGet();
+    // NOTE: using InMemoryTransactionManager.MAX_TX_PER_MS to be at least close to real one
+    long now = System.currentTimeMillis();
+    if (wp < now * TxConstants.MAX_TX_PER_MS) {
+      // trying to advance to align with timestamp, but only once: if failed, we'll just try again later with next tx
+      long advanced = now * TxConstants.MAX_TX_PER_MS;
+      if (generator.compareAndSet(wp, advanced)) {
+        wp = advanced;
+      }
+    }
+    return wp;
+  }
+
+  @Override
+  public Transaction startShort(int timeout) {
+    return startShort();
+  }
+
+  @Override
+  public Transaction startLong() {
+    return startShort();
+  }
+
+  @Override
+  public boolean canCommit(Transaction tx, Collection<byte[]> changeIds) {
+    return true;
+  }
+
+  @Override
+  public boolean commit(Transaction tx) {
+    return true;
+  }
+
+  @Override
+  public void abort(Transaction tx) {
+    // do nothing
+  }
+
+  @Override
+  public boolean invalidate(long tx) {
+    return true;
+  }
+
+  @Override
+  public Transaction checkpoint(Transaction tx) {
+    long newWritePointer = getWritePointer();
+    LongArrayList newCheckpointPointers = new LongArrayList(tx.getCheckpointWritePointers());
+    newCheckpointPointers.add(newWritePointer);
+    return new Transaction(tx, newWritePointer, newCheckpointPointers.toLongArray());
+  }
+
+  @Override
+  public InputStream getSnapshotInputStream() throws TransactionCouldNotTakeSnapshotException {
+    throw new TransactionCouldNotTakeSnapshotException(
+        "Snapshot not implemented in detached transaction system client");
+  }
+
+  @Override
+  public String status() {
+    return TxConstants.STATUS_OK;
+  }
+
+  @Override
+  public void resetState() {
+    // do nothing
+  }
+
+  @Override
+  public boolean truncateInvalidTx(Set<Long> invalidTxIds) {
+    return true;
+  }
+
+  @Override
+  public boolean truncateInvalidTxBefore(long time) throws InvalidTruncateTimeException {
+    return true;
+  }
+
+  @Override
+  public int getInvalidSize() {
+    return 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTransactionService.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTransactionService.java b/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTransactionService.java
new file mode 100644
index 0000000..823f934
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTransactionService.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tephra.inmemory;
+
+import com.google.common.util.concurrent.AbstractService;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tephra.TransactionManager;
+import org.apache.tephra.TxConstants;
+import org.apache.twill.common.Cancellable;
+import org.apache.twill.discovery.Discoverable;
+import org.apache.twill.discovery.DiscoveryService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+
+/**
+ * Transaction server that manages transaction data for the Reactor.
+ * <p>
+ *   Transaction server is HA, one can start multiple instances, only one of which is active and will register itself in
+ *   discovery service.
+ * </p>
+ */
+public class InMemoryTransactionService extends AbstractService {
+  private static final Logger LOG = LoggerFactory.getLogger(InMemoryTransactionService.class);
+
+  private final DiscoveryService discoveryService;
+  private final String serviceName;
+  protected final Provider<TransactionManager> txManagerProvider;
+  private Cancellable cancelDiscovery;
+  protected TransactionManager txManager;
+
+  // thrift server config
+  protected final String address;
+  protected final int port;
+  protected final int threads;
+  protected final int ioThreads;
+  protected final int maxReadBufferBytes;
+
+  @Inject
+  public InMemoryTransactionService(Configuration conf,
+                            DiscoveryService discoveryService,
+                            Provider<TransactionManager> txManagerProvider) {
+
+    this.discoveryService = discoveryService;
+    this.txManagerProvider = txManagerProvider;
+    this.serviceName = conf.get(TxConstants.Service.CFG_DATA_TX_DISCOVERY_SERVICE_NAME,
+                                TxConstants.Service.DEFAULT_DATA_TX_DISCOVERY_SERVICE_NAME);
+
+    address = conf.get(TxConstants.Service.CFG_DATA_TX_BIND_ADDRESS, TxConstants.Service.DEFAULT_DATA_TX_BIND_ADDRESS);
+    port = conf.getInt(TxConstants.Service.CFG_DATA_TX_BIND_PORT, TxConstants.Service.DEFAULT_DATA_TX_BIND_PORT);
+
+    // Retrieve the number of threads for the service
+    threads = conf.getInt(TxConstants.Service.CFG_DATA_TX_SERVER_THREADS,
+                          TxConstants.Service.DEFAULT_DATA_TX_SERVER_THREADS);
+    ioThreads = conf.getInt(TxConstants.Service.CFG_DATA_TX_SERVER_IO_THREADS,
+                            TxConstants.Service.DEFAULT_DATA_TX_SERVER_IO_THREADS);
+
+    maxReadBufferBytes = conf.getInt(TxConstants.Service.CFG_DATA_TX_THRIFT_MAX_READ_BUFFER,
+                                     TxConstants.Service.DEFAULT_DATA_TX_THRIFT_MAX_READ_BUFFER);
+
+    LOG.info("Configuring TransactionService" +
+               ", address: " + address +
+               ", port: " + port +
+               ", threads: " + threads +
+               ", io threads: " + ioThreads +
+               ", max read buffer (bytes): " + maxReadBufferBytes);
+  }
+
+  protected void undoRegister() {
+    if (cancelDiscovery != null) {
+      cancelDiscovery.cancel();
+    }
+  }
+
+  protected void doRegister() {
+    cancelDiscovery = discoveryService.register(new Discoverable() {
+      @Override
+      public String getName() {
+        return serviceName;
+      }
+
+      @Override
+      public InetSocketAddress getSocketAddress() {
+        return getAddress();
+      }
+    });
+  }
+
+  protected InetSocketAddress getAddress() {
+    return new InetSocketAddress(1);
+  }
+
+  @Override
+  protected void doStart() {
+    try {
+      txManager = txManagerProvider.get();
+      txManager.startAndWait();
+      doRegister();
+      LOG.info("Transaction Thrift service started successfully on " + getAddress());
+      notifyStarted();
+    } catch (Throwable t) {
+      LOG.info("Transaction Thrift service didn't start on " + getAddress());
+      notifyFailed(t);
+    }
+  }
+
+  @Override
+  protected void doStop() {
+    undoRegister();
+    txManager.stopAndWait();
+    notifyStopped();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTxSystemClient.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTxSystemClient.java b/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTxSystemClient.java
new file mode 100644
index 0000000..da38dd2
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTxSystemClient.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.inmemory;
+
+import com.google.inject.Inject;
+import org.apache.tephra.InvalidTruncateTimeException;
+import org.apache.tephra.Transaction;
+import org.apache.tephra.TransactionCouldNotTakeSnapshotException;
+import org.apache.tephra.TransactionManager;
+import org.apache.tephra.TransactionNotInProgressException;
+import org.apache.tephra.TransactionSystemClient;
+import org.apache.tephra.TxConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collection;
+import java.util.Set;
+
+/**
+ *
+ */
+public class InMemoryTxSystemClient implements TransactionSystemClient {
+
+  private static final Logger LOG = LoggerFactory.getLogger(InMemoryTxSystemClient.class);
+
+  TransactionManager txManager;
+
+  @Inject
+  public InMemoryTxSystemClient(TransactionManager txmgr) {
+    txManager = txmgr;
+  }
+
+  @Override
+  public Transaction startLong() {
+    return txManager.startLong();
+  }
+
+  @Override
+  public Transaction startShort() {
+    return txManager.startShort();
+  }
+
+  @Override
+  public Transaction startShort(int timeout) {
+    return txManager.startShort(timeout);
+  }
+
+  @Override
+  public boolean canCommit(Transaction tx, Collection<byte[]> changeIds) throws TransactionNotInProgressException {
+    return changeIds.isEmpty() || txManager.canCommit(tx, changeIds);
+  }
+
+  @Override
+  public boolean commit(Transaction tx) throws TransactionNotInProgressException {
+    return txManager.commit(tx);
+  }
+
+  @Override
+  public void abort(Transaction tx) {
+    txManager.abort(tx);
+  }
+
+  @Override
+  public boolean invalidate(long tx) {
+    return txManager.invalidate(tx);
+  }
+
+  @Override
+  public Transaction checkpoint(Transaction tx) throws TransactionNotInProgressException {
+    return txManager.checkpoint(tx);
+  }
+
+  @Override
+  public InputStream getSnapshotInputStream() throws TransactionCouldNotTakeSnapshotException {
+    try {
+      ByteArrayOutputStream out = new ByteArrayOutputStream();
+      try {
+        boolean snapshotTaken = txManager.takeSnapshot(out);
+        if (!snapshotTaken) {
+          throw new TransactionCouldNotTakeSnapshotException("Transaction manager did not take a snapshot.");
+        }
+      } finally {
+        out.close();
+      }
+      return new ByteArrayInputStream(out.toByteArray());
+    } catch (IOException e) {
+      LOG.error("Snapshot could not be taken", e);
+      throw new TransactionCouldNotTakeSnapshotException(e.getMessage());
+    }
+  }
+
+  @Override
+  public String status() {
+    return txManager.isRunning() ? TxConstants.STATUS_OK : TxConstants.STATUS_NOTOK;
+  }
+
+  @Override
+  public void resetState() {
+    txManager.resetState();
+  }
+
+  @Override
+  public boolean truncateInvalidTx(Set<Long> invalidTxIds) {
+    return txManager.truncateInvalidTx(invalidTxIds);
+  }
+
+  @Override
+  public boolean truncateInvalidTxBefore(long time) throws InvalidTruncateTimeException {
+    return txManager.truncateInvalidTxBefore(time);
+  }
+
+  @Override
+  public int getInvalidSize() {
+    return txManager.getInvalidSize();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/inmemory/MinimalTxSystemClient.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/inmemory/MinimalTxSystemClient.java b/tephra-core/src/main/java/org/apache/tephra/inmemory/MinimalTxSystemClient.java
new file mode 100644
index 0000000..2f60225
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/inmemory/MinimalTxSystemClient.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.inmemory;
+
+import org.apache.tephra.InvalidTruncateTimeException;
+import org.apache.tephra.Transaction;
+import org.apache.tephra.TransactionCouldNotTakeSnapshotException;
+import org.apache.tephra.TransactionNotInProgressException;
+import org.apache.tephra.TransactionSystemClient;
+import org.apache.tephra.TransactionType;
+import org.apache.tephra.TxConstants;
+
+import java.io.InputStream;
+import java.util.Collection;
+import java.util.Set;
+
+/**
+ * Dummy implementation of TxSystemClient. May be useful for perf testing.
+ */
+public class MinimalTxSystemClient implements TransactionSystemClient {
+  private long currentTxPointer = 1;
+
+  @Override
+  public Transaction startShort() {
+    long wp = currentTxPointer++;
+    // NOTE: -1 here is because we have logic that uses (readpointer + 1) as a "exclusive stop key" in some datasets
+    return new Transaction(
+      Long.MAX_VALUE - 1, wp, new long[0], new long[0],
+      Transaction.NO_TX_IN_PROGRESS, TransactionType.SHORT);
+  }
+
+  @Override
+  public Transaction startShort(int timeout) {
+    return startShort();
+  }
+
+  @Override
+  public Transaction startLong() {
+    return startShort();
+  }
+
+  @Override
+  public boolean canCommit(Transaction tx, Collection<byte[]> changeIds) {
+    return true;
+  }
+
+  @Override
+  public boolean commit(Transaction tx) {
+    return true;
+  }
+
+  @Override
+  public void abort(Transaction tx) {
+    // do nothing
+  }
+
+  @Override
+  public boolean invalidate(long tx) {
+    return true;
+  }
+
+  @Override
+  public Transaction checkpoint(Transaction tx) throws TransactionNotInProgressException {
+    return tx;
+  }
+
+  @Override
+  public InputStream getSnapshotInputStream() throws TransactionCouldNotTakeSnapshotException {
+    throw new TransactionCouldNotTakeSnapshotException("Not snapshot to take.");
+  }
+
+  @Override
+  public String status() {
+    return TxConstants.STATUS_OK;
+  }
+
+  @Override
+  public void resetState() {
+    // do nothing
+  }
+
+  @Override
+  public boolean truncateInvalidTx(Set<Long> invalidTxIds) {
+    return true;
+  }
+
+  @Override
+  public boolean truncateInvalidTxBefore(long time) throws InvalidTruncateTimeException {
+    return true;
+  }
+
+  @Override
+  public int getInvalidSize() {
+    return 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/inmemory/package-info.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/inmemory/package-info.java b/tephra-core/src/main/java/org/apache/tephra/inmemory/package-info.java
new file mode 100644
index 0000000..2e396fe
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/inmemory/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains in memory implementation of the transaction system v2.
+ */
+package org.apache.tephra.inmemory;

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/metrics/DefaultMetricsCollector.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/metrics/DefaultMetricsCollector.java b/tephra-core/src/main/java/org/apache/tephra/metrics/DefaultMetricsCollector.java
new file mode 100644
index 0000000..86d2afa
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/metrics/DefaultMetricsCollector.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.metrics;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.JmxReporter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.ScheduledReporter;
+import com.codahale.metrics.Slf4jReporter;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tephra.TxConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Default metrics collector implementation using <a href="http://metrics.dropwizard.io">Yammer Metrics</a>.
+ *
+ * <p>The reporting frequency for this collector can be configured by setting the
+ * {@code data.tx.metrics.period} configuration property to the reporting frequency in seconds.
+ * </p>
+ */
+public class DefaultMetricsCollector extends TxMetricsCollector {
+  private static final Logger LOG = LoggerFactory.getLogger(DefaultMetricsCollector.class);
+
+  private final MetricRegistry metrics = new MetricRegistry();
+  private JmxReporter jmxReporter;
+  private ScheduledReporter reporter;
+  private int reportPeriod;
+  private ConcurrentMap<String, AtomicLong> gauges = Maps.newConcurrentMap();
+
+  @Override
+  public void configure(Configuration conf) {
+    // initialize selected output reporter
+    reportPeriod = conf.getInt(TxConstants.Metrics.REPORT_PERIOD_KEY, TxConstants.Metrics.REPORT_PERIOD_DEFAULT);
+    LOG.info("Configured metrics report to emit every {} seconds", reportPeriod);
+    // TODO: reporters should be pluggable based on injection
+    jmxReporter = JmxReporter.forRegistry(metrics).build();
+    reporter = Slf4jReporter.forRegistry(metrics)
+                            .outputTo(LoggerFactory.getLogger("tephra-metrics"))
+                            .convertRatesTo(TimeUnit.SECONDS)
+                            .convertDurationsTo(TimeUnit.MILLISECONDS)
+                            .build();
+  }
+
+
+  @Override
+  public void gauge(String metricName, int value, String... tags) {
+    AtomicLong gauge = gauges.get(metricName);
+    if (gauge == null) {
+      final AtomicLong newValue = new AtomicLong();
+      if (gauges.putIfAbsent(metricName, newValue) == null) {
+        // first to set the value, need to register the metric
+        metrics.register(metricName, new Gauge<Long>() {
+          @Override
+          public Long getValue() {
+            return newValue.get();
+          }
+        });
+        gauge = newValue;
+      } else {
+        // someone else set it first
+        gauge = gauges.get(metricName);
+      }
+    }
+    gauge.set(value);
+  }
+
+  @Override
+  public void histogram(String metricName, int value) {
+    metrics.histogram(metricName).update(value);
+  }
+
+  @Override
+  public void rate(String metricName) {
+    metrics.meter(metricName).mark();
+  }
+
+  @Override
+  public void rate(String metricName, int count) {
+    metrics.meter(metricName).mark(count);
+  }
+
+  @Override
+  protected void startUp() throws Exception {
+    jmxReporter.start();
+    reporter.start(reportPeriod, TimeUnit.SECONDS);
+    LOG.info("Started metrics reporter");
+  }
+
+  @Override
+  protected void shutDown() throws Exception {
+    jmxReporter.stop();
+    reporter.stop();
+    LOG.info("Stopped metrics reporter");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/metrics/MetricsCollector.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/metrics/MetricsCollector.java b/tephra-core/src/main/java/org/apache/tephra/metrics/MetricsCollector.java
new file mode 100644
index 0000000..45668e1
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/metrics/MetricsCollector.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.metrics;
+
+import com.google.common.util.concurrent.Service;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Basic API for Tephra to support system metrics.
+ */
+public interface MetricsCollector extends Service {
+  /**
+   * Report a metric as an absolute value.
+   */
+  void gauge(String metricName, int value, String... tags);
+
+  /**
+   * Report a metric as a count over a given time duration.  This method uses an implicit count of 1.
+   */
+  void rate(String metricName);
+
+  /**
+   * Report a metric as a count over a given time duration.
+   */
+  void rate(String metricName, int count);
+
+  /**
+   * Report a metric calculating the distribution of the value.
+   */
+  void histogram(String metricName, int value);
+
+  /**
+   * Called before the collector service is started, allowing the collector to setup any
+   * required configuration.
+   */
+  void configure(Configuration conf);
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/metrics/TxMetricsCollector.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/metrics/TxMetricsCollector.java b/tephra-core/src/main/java/org/apache/tephra/metrics/TxMetricsCollector.java
new file mode 100644
index 0000000..4dc7e2f
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/metrics/TxMetricsCollector.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.metrics;
+
+import com.google.common.util.concurrent.AbstractIdleService;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Metrics Collector Class, to emit Transaction Related Metrics.
+ * Note: This default implementation is a no-op and doesn't emit any metrics
+ */
+public class TxMetricsCollector extends AbstractIdleService implements MetricsCollector {
+
+  @Override
+  public void gauge(String metricName, int value, String... tags) {
+    //no-op
+  }
+
+  @Override
+  public void rate(String metricName) {
+    // no-op
+  }
+
+  @Override
+  public void rate(String metricName, int count) {
+    // no-op
+  }
+
+  @Override
+  public void histogram(String metricName, int value) {
+    // no-op
+  }
+
+  @Override
+  public void configure(Configuration conf) {
+    // no-op
+  }
+
+  /* Service methods */
+
+  @Override
+  protected void startUp() throws Exception {
+    // no-op
+  }
+
+  @Override
+  protected void shutDown() throws Exception {
+    // no-op
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/package-info.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/package-info.java b/tephra-core/src/main/java/org/apache/tephra/package-info.java
new file mode 100644
index 0000000..dea84ac
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains implementations of the transaction system v2.
+ */
+package org.apache.tephra;

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java b/tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java
new file mode 100644
index 0000000..b1e0978
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.persist;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.tephra.metrics.MetricsCollector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Common implementation of a transaction log, backed by file reader and writer based storage.  Classes extending
+ * this class, must also implement {@link TransactionLogWriter} and {@link TransactionLogReader}.
+ */
+public abstract class AbstractTransactionLog implements TransactionLog {
+  /** Time limit, in milliseconds, of an append to the transaction log before we log it as "slow". */
+  private static final long SLOW_APPEND_THRESHOLD = 1000L;
+
+  private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionLog.class);
+
+  private final AtomicLong logSequence = new AtomicLong();
+  private final MetricsCollector metricsCollector;
+  protected long timestamp;
+  private volatile boolean initialized;
+  private volatile boolean closed;
+  private AtomicLong syncedUpTo = new AtomicLong();
+  private List<Entry> pendingWrites = Lists.newLinkedList();
+  private TransactionLogWriter writer;
+
+  public AbstractTransactionLog(long timestamp, MetricsCollector metricsCollector) {
+    this.timestamp = timestamp;
+    this.metricsCollector = metricsCollector;
+  }
+
+  /**
+   * Initializes the log file, opening a file writer.  Clients calling {@code init()} should ensure that they
+   * also call {@link HDFSTransactionLog#close()}.
+   * @throws java.io.IOException If an error is encountered initializing the file writer.
+   */
+  public synchronized void init() throws IOException {
+    if (initialized) {
+      return;
+    }
+    this.writer = createWriter();
+    this.initialized = true;
+  }
+
+  /**
+   * Returns a log writer to be used for appending any new {@link TransactionEdit} objects.
+   */
+  protected abstract TransactionLogWriter createWriter() throws IOException;
+
+  @Override
+  public abstract String getName();
+
+  @Override
+  public long getTimestamp() {
+    return timestamp;
+  }
+
+  @Override
+  public void append(TransactionEdit edit) throws IOException {
+    long startTime = System.nanoTime();
+    synchronized (this) {
+      ensureAvailable();
+
+      Entry entry = new Entry(new LongWritable(logSequence.getAndIncrement()), edit);
+
+      // add to pending edits
+      append(entry);
+    }
+
+    // wait for sync to complete
+    sync();
+    long durationMillis = (System.nanoTime() - startTime) / 1000000L;
+    if (durationMillis > SLOW_APPEND_THRESHOLD) {
+      LOG.info("Slow append to log " + getName() + ", took " + durationMillis + " msec.");
+    }
+  }
+
+  @Override
+  public void append(List<TransactionEdit> edits) throws IOException {
+    long startTime = System.nanoTime();
+    synchronized (this) {
+      ensureAvailable();
+
+      for (TransactionEdit edit : edits) {
+        Entry entry = new Entry(new LongWritable(logSequence.getAndIncrement()), edit);
+
+        // add to pending edits
+        append(entry);
+      }
+    }
+
+    // wait for sync to complete
+    sync();
+    long durationMillis = (System.nanoTime() - startTime) / 1000000L;
+    if (durationMillis > SLOW_APPEND_THRESHOLD) {
+      LOG.info("Slow append to log " + getName() + ", took " + durationMillis + " msec.");
+    }
+  }
+
+  private void ensureAvailable() throws IOException {
+    if (closed) {
+      throw new IOException("Log " + getName() + " is already closed, cannot append!");
+    }
+    if (!initialized) {
+      init();
+    }
+  }
+
+  /*
+   * Appends new writes to the pendingWrites. It is better to keep it in
+   * our own queue rather than writing it to the HDFS output stream because
+   * HDFSOutputStream.writeChunk is not lightweight at all.
+   */
+  private void append(Entry e) throws IOException {
+    pendingWrites.add(e);
+  }
+
+  // Returns all currently pending writes. New writes
+  // will accumulate in a new list.
+  private List<Entry> getPendingWrites() {
+    synchronized (this) {
+      List<Entry> save = this.pendingWrites;
+      this.pendingWrites = new LinkedList<>();
+      return save;
+    }
+  }
+
+  private void sync() throws IOException {
+    // writes out pending entries to the HLog
+    TransactionLogWriter tmpWriter = null;
+    long latestSeq = 0;
+    int entryCount = 0;
+    synchronized (this) {
+      if (closed) {
+        return;
+      }
+      // prevent writer being dereferenced
+      tmpWriter = writer;
+
+      List<Entry> currentPending = getPendingWrites();
+      if (!currentPending.isEmpty()) {
+        tmpWriter.commitMarker(currentPending.size());
+      }
+
+      // write out all accumulated entries to log.
+      for (Entry e : currentPending) {
+        tmpWriter.append(e);
+        entryCount++;
+        latestSeq = Math.max(latestSeq, e.getKey().get());
+      }
+    }
+
+    long lastSynced = syncedUpTo.get();
+    // someone else might have already synced our edits, avoid double syncing
+    if (lastSynced < latestSeq) {
+      tmpWriter.sync();
+      metricsCollector.histogram("wal.sync.size", entryCount);
+      syncedUpTo.compareAndSet(lastSynced, latestSeq);
+    }
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    if (closed) {
+      return;
+    }
+    // perform a final sync if any outstanding writes
+    if (!pendingWrites.isEmpty()) {
+      sync();
+    }
+    // NOTE: writer is lazy-inited, so it can be null
+    if (writer != null) {
+      this.writer.close();
+    }
+    this.closed = true;
+  }
+
+  public boolean isClosed() {
+    return closed;
+  }
+
+  @Override
+  public abstract TransactionLogReader getReader() throws IOException;
+
+  /**
+   * Represents an entry in the transaction log.  Each entry consists of a key, generated from an incrementing sequence
+   * number, and a value, the {@link TransactionEdit} being stored.
+   */
+  public static class Entry implements Writable {
+    private LongWritable key;
+    private TransactionEdit edit;
+
+    // for Writable
+    public Entry() {
+      this.key = new LongWritable();
+      this.edit = new TransactionEdit();
+    }
+
+    public Entry(LongWritable key, TransactionEdit edit) {
+      this.key = key;
+      this.edit = edit;
+    }
+
+    public LongWritable getKey() {
+      return this.key;
+    }
+
+    public TransactionEdit getEdit() {
+      return this.edit;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      this.key.write(out);
+      this.edit.write(out);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      this.key.readFields(in);
+      this.edit.readFields(in);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionStateStorage.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionStateStorage.java b/tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionStateStorage.java
new file mode 100644
index 0000000..1c51ccd
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionStateStorage.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.persist;
+
+import com.google.common.util.concurrent.AbstractIdleService;
+import org.apache.tephra.snapshot.SnapshotCodecProvider;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * Common base class for all transaction storage implementations. This implement logic to prefix a snapshot
+ * with a version when encoding, and to select the correct codec for decoding based on this version prefix.
+ */
+public abstract class AbstractTransactionStateStorage extends AbstractIdleService implements TransactionStateStorage {
+
+  protected final SnapshotCodecProvider codecProvider;
+
+  protected AbstractTransactionStateStorage(SnapshotCodecProvider codecProvider) {
+    this.codecProvider = codecProvider;
+  }
+
+  @Override
+  public void writeSnapshot(OutputStream out, TransactionSnapshot snapshot) throws IOException {
+    codecProvider.encode(out, snapshot);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/persist/CommitMarkerCodec.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/CommitMarkerCodec.java b/tephra-core/src/main/java/org/apache/tephra/persist/CommitMarkerCodec.java
new file mode 100644
index 0000000..c4f02e5
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/CommitMarkerCodec.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.persist;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Charsets;
+import com.google.common.primitives.Bytes;
+import com.google.common.primitives.Ints;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.tephra.TxConstants;
+
+import java.io.Closeable;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/**
+ * Class to read and write commit markers used in {@link HDFSTransactionLogReaderV2} and above.
+ */
+public class CommitMarkerCodec implements Closeable {
+  private static final byte[] KEY_BYTES = TxConstants.TransactionLog.NUM_ENTRIES_APPENDED.getBytes(Charsets.UTF_8);
+  private final DataOutputBuffer rawKey;
+  private final DataOutputBuffer rawValue;
+  private SequenceFile.ValueBytes valueBytes;
+
+  public CommitMarkerCodec() {
+    this.rawKey = new DataOutputBuffer();
+    this.rawValue = new DataOutputBuffer();
+  }
+
+  @Override
+  public void close() throws IOException {
+    rawKey.close();
+    rawValue.close();
+  }
+
+  // 1. Returns the count when the marker is written correctly
+  // 2. If data is incorrect (for ex, incorrect key, mismatch in key/value/record length), we throw IOException
+  // since this indicates corrupted log file
+  // 3. If data is incomplete, then we throw EOFException which is handled gracefully by the calling method
+  // since we can recover without any consequence
+  public int readMarker(SequenceFile.Reader reader) throws IOException {
+    if (valueBytes == null) {
+      valueBytes = reader.createValueBytes();
+    }
+    rawKey.reset();
+    rawValue.reset();
+
+    // valueBytes need not be reset since nextRaw call does it (and it is a private method)
+    int status = reader.nextRaw(rawKey, valueBytes);
+
+    // if we reach EOF, return -1
+    if (status == -1) {
+      return -1;
+    }
+
+    // Check if the marker key is valid and return the count
+    if (isMarkerValid()) {
+      valueBytes.writeUncompressedBytes(rawValue);
+      rawValue.flush();
+      // rawValue.getData() may return a larger byte array but Ints.fromByteArray will only read the first four bytes
+      return Ints.fromByteArray(rawValue.getData());
+    }
+
+    // EOF not reached and marker is not valid, then thrown an IOException since we can't make progress
+    throw new IOException(String.format("Invalid key for num entries appended found %s, expected : %s",
+                                        new String(rawKey.getData()), TxConstants.TransactionLog.NUM_ENTRIES_APPENDED));
+  }
+
+  private boolean isMarkerValid() {
+    // rawKey should have the expected length and the matching bytes should start at index 0
+    return rawKey.getLength() == KEY_BYTES.length && Bytes.indexOf(rawKey.getData(), KEY_BYTES) == 0;
+  }
+
+  public static void writeMarker(SequenceFile.Writer writer, int count) throws IOException {
+    writer.appendRaw(KEY_BYTES, 0, KEY_BYTES.length, new CommitEntriesCount(count));
+  }
+
+  @VisibleForTesting
+  static final class CommitEntriesCount implements SequenceFile.ValueBytes {
+    private final int numEntries;
+
+    public CommitEntriesCount(int numEntries) {
+      this.numEntries = numEntries;
+    }
+
+    @Override
+    public void writeUncompressedBytes(DataOutputStream outStream) throws IOException {
+      outStream.write(Ints.toByteArray(numEntries));
+    }
+
+    @Override
+    public void writeCompressedBytes(DataOutputStream outStream) throws IllegalArgumentException, IOException {
+      throw new IllegalArgumentException("Commit Entries count writing is not expected to be compressed.");
+    }
+
+    @Override
+    public int getSize() {
+      return Ints.BYTES;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLog.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLog.java b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLog.java
new file mode 100644
index 0000000..ba781ac
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLog.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.persist;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.metrics.MetricsCollector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+/**
+ * Allows reading from and writing to a transaction write-ahead log stored in HDFS.
+ */
+public class HDFSTransactionLog extends AbstractTransactionLog {
+  private static final Logger LOG = LoggerFactory.getLogger(HDFSTransactionLog.class);
+
+  private final FileSystem fs;
+  private final Configuration hConf;
+  private final Path logPath;
+
+  /**
+   * Creates a new HDFS-backed write-ahead log for storing transaction state.
+   * @param fs Open FileSystem instance for opening log files in HDFS.
+   * @param hConf HDFS cluster configuration.
+   * @param logPath Path to the log file.
+   */
+  public HDFSTransactionLog(final FileSystem fs, final Configuration hConf,
+                            final Path logPath, long timestamp, MetricsCollector metricsCollector) {
+    super(timestamp, metricsCollector);
+    this.fs = fs;
+    this.hConf = hConf;
+    this.logPath = logPath;
+  }
+
+  @Override
+  protected TransactionLogWriter createWriter() throws IOException {
+    return new LogWriter(fs, hConf, logPath);
+  }
+
+  @Override
+  public String getName() {
+    return logPath.getName();
+  }
+
+  @Override
+  public TransactionLogReader getReader() throws IOException {
+    FileStatus status = fs.getFileStatus(logPath);
+    long length = status.getLen();
+
+    TransactionLogReader reader = null;
+    // check if this file needs to be recovered due to failure
+    // Check for possibly empty file. With appends, currently Hadoop reports a
+    // zero length even if the file has been sync'd. Revisit if HDFS-376 or
+    // HDFS-878 is committed.
+    if (length <= 0) {
+      LOG.warn("File " + logPath + " might be still open, length is 0");
+    }
+
+    try {
+      HDFSUtil hdfsUtil = new HDFSUtil();
+      hdfsUtil.recoverFileLease(fs, logPath, hConf);
+      try {
+        FileStatus newStatus = fs.getFileStatus(logPath);
+        LOG.info("New file size for " + logPath + " is " + newStatus.getLen());
+        SequenceFile.Reader fileReader = new SequenceFile.Reader(fs, logPath, hConf);
+        reader = new HDFSTransactionLogReaderSupplier(fileReader).get();
+      } catch (EOFException e) {
+        if (length <= 0) {
+          // TODO should we ignore an empty, not-last log file if skip.errors
+          // is false? Either way, the caller should decide what to do. E.g.
+          // ignore if this is the last log in sequence.
+          // TODO is this scenario still possible if the log has been
+          // recovered (i.e. closed)
+          LOG.warn("Could not open " + logPath + " for reading. File is empty", e);
+          return null;
+        } else {
+          // EOFException being ignored
+          return null;
+        }
+      }
+    } catch (IOException e) {
+      throw e;
+    }
+    return reader;
+  }
+
+  @VisibleForTesting
+  static final class LogWriter implements TransactionLogWriter {
+    private final SequenceFile.Writer internalWriter;
+    public LogWriter(FileSystem fs, Configuration hConf, Path logPath) throws IOException {
+      // TODO: retry a few times to ride over transient failures?
+      SequenceFile.Metadata metadata = new SequenceFile.Metadata();
+      metadata.set(new Text(TxConstants.TransactionLog.VERSION_KEY),
+                   new Text(Byte.toString(TxConstants.TransactionLog.CURRENT_VERSION)));
+
+      this.internalWriter = SequenceFile.createWriter(fs, hConf, logPath, LongWritable.class, TransactionEdit.class,
+                                                      SequenceFile.CompressionType.NONE, null, null, metadata);
+      LOG.debug("Created a new TransactionLog writer for " + logPath);
+    }
+
+    @Override
+    public void append(Entry entry) throws IOException {
+      internalWriter.append(entry.getKey(), entry.getEdit());
+    }
+
+    @Override
+    public void commitMarker(int count) throws IOException {
+      CommitMarkerCodec.writeMarker(internalWriter, count);
+    }
+
+    @Override
+    public void sync() throws IOException {
+      internalWriter.syncFs();
+    }
+
+    @Override
+    public void close() throws IOException {
+      internalWriter.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderSupplier.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderSupplier.java b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderSupplier.java
new file mode 100644
index 0000000..a517903
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderSupplier.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.persist;
+
+import com.google.common.base.Supplier;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.tephra.TxConstants;
+
+/**
+ * Provides the correct version of {@link TransactionLogReader}, based on the log's version metadata,
+ * to read HDFS Transaction Logs.
+ */
+public class HDFSTransactionLogReaderSupplier implements Supplier<TransactionLogReader> {
+  private final SequenceFile.Reader reader;
+  private final byte version;
+  private TransactionLogReader logReader;
+
+  public HDFSTransactionLogReaderSupplier(SequenceFile.Reader reader) {
+    this.reader = reader;
+    Text versionInfo = reader.getMetadata().get(new Text(TxConstants.TransactionLog.VERSION_KEY));
+    this.version = versionInfo == null ? 1 : Byte.parseByte(versionInfo.toString());
+  }
+
+  @Override
+  public TransactionLogReader get() {
+    if (logReader != null) {
+      return logReader;
+    }
+
+    switch (version) {
+      case 2:
+        logReader = new HDFSTransactionLogReaderV2(reader);
+        return logReader;
+      case 1:
+        logReader = new HDFSTransactionLogReaderV1(reader);
+        return logReader;
+      default:
+        throw new IllegalArgumentException(String.format("Invalid version %s found in the Transaction Log", version));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV1.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV1.java b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV1.java
new file mode 100644
index 0000000..faefaec
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV1.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.persist;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+/**
+ * {@link TransactionLogReader} that can read v1 (default) version of Transaction logs. The logs are expected to
+ * have a sequence of {@link TransactionEdit}s.
+ */
+public class HDFSTransactionLogReaderV1 implements TransactionLogReader {
+  private static final Logger LOG = LoggerFactory.getLogger(HDFSTransactionLogReaderV1.class);
+  private final SequenceFile.Reader reader;
+  private final LongWritable key;
+  private boolean closed;
+
+  public HDFSTransactionLogReaderV1(SequenceFile.Reader reader) {
+    this.reader = reader;
+    this.key = new LongWritable();
+  }
+
+  @Override
+  public TransactionEdit next() throws IOException {
+    return next(new TransactionEdit());
+  }
+
+  @Override
+  public TransactionEdit next(TransactionEdit reuse) throws IOException {
+    if (closed) {
+      return null;
+    }
+
+    try {
+      boolean successful = reader.next(key, reuse);
+      return successful ? reuse : null;
+    } catch (EOFException e) {
+      LOG.warn("Hit an unexpected EOF while trying to read the Transaction Edit. Skipping the entry.", e);
+      return null;
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (closed) {
+      return;
+    }
+    reader.close();
+    closed = true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV2.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV2.java b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV2.java
new file mode 100644
index 0000000..ce50da8
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV2.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.persist;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Queue;
+
+/**
+ * {@link TransactionLogReader} that can read v2 version of Transaction logs. The logs are expected to
+ * have commit markers that indicates the size of the batch of {@link TransactionEdit}s (follows the marker),
+ * that were synced together. If the expected number of {@link TransactionEdit}s are not present then that set of
+ * {@link TransactionEdit}s are discarded.
+ */
+public class HDFSTransactionLogReaderV2 implements TransactionLogReader {
+  private static final Logger LOG = LoggerFactory.getLogger(HDFSTransactionLogReaderV2.class);
+
+  private final SequenceFile.Reader reader;
+  private final Queue<TransactionEdit> transactionEdits;
+  private final CommitMarkerCodec commitMarkerCodec;
+  private final LongWritable key;
+
+  private boolean closed;
+
+  public HDFSTransactionLogReaderV2(SequenceFile.Reader reader) {
+    this.reader = reader;
+    this.transactionEdits = new ArrayDeque<>();
+    this.key = new LongWritable();
+    this.commitMarkerCodec = new CommitMarkerCodec();
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (closed) {
+      return;
+    }
+    try {
+      commitMarkerCodec.close();
+    } finally {
+      reader.close();
+      closed = true;
+    }
+  }
+
+  @Override
+  public TransactionEdit next() throws IOException {
+    return next(null);
+  }
+
+  @Override
+  public TransactionEdit next(TransactionEdit reuse) throws IOException {
+    if (closed) {
+      return null;
+    }
+
+    if (!transactionEdits.isEmpty()) {
+      return transactionEdits.remove();
+    }
+
+    // Fetch the 'marker' and read 'marker' number of edits
+    populateTransactionEdits();
+    return transactionEdits.poll();
+  }
+
+  private void populateTransactionEdits() throws IOException {
+    // read the marker to determine numEntries to read.
+    int numEntries = 0;
+    try {
+      // can throw EOFException if reading of incomplete commit marker, no other action required since we can safely
+      // ignore this
+      numEntries = commitMarkerCodec.readMarker(reader);
+    } catch (EOFException e) {
+      LOG.warn("Reached EOF in log while trying to read commit marker", e);
+    }
+
+    for (int i = 0; i < numEntries; i++) {
+      TransactionEdit edit = new TransactionEdit();
+      try {
+        if (reader.next(key, edit)) {
+          transactionEdits.add(edit);
+        } else {
+          throw new EOFException("Attempt to read TransactionEdit failed.");
+        }
+      } catch (EOFException e) {
+        // we have reached EOF before reading back numEntries, we clear the partial list and return.
+        LOG.warn("Reached EOF in log before reading {} entries. Ignoring all {} edits since the last marker",
+                 numEntries, transactionEdits.size(), e);
+        transactionEdits.clear();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionStateStorage.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionStateStorage.java b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionStateStorage.java
new file mode 100644
index 0000000..d751ae2
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionStateStorage.java
@@ -0,0 +1,492 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.persist;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.io.CountingInputStream;
+import com.google.common.primitives.Longs;
+import com.google.inject.Inject;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.metrics.MetricsCollector;
+import org.apache.tephra.metrics.TxMetricsCollector;
+import org.apache.tephra.snapshot.SnapshotCodecProvider;
+import org.apache.tephra.util.ConfigurationFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import javax.annotation.Nullable;
+
+/**
+ * Handles persistence of transaction snapshot and logs to a directory in HDFS.
+ *
+ * The directory used for file storage is configured using the {@code data.tx.snapshot.dir} configuration property.
+ * Both snapshot and transaction log files are suffixed with a timestamp to allow easy ordering.  Snapshot files
+ * are written with the filename "snapshot.&lt;timestamp&gt;".  Transaction log files are written with the filename
+ * "txlog.&lt;timestamp&gt;".
+ */
+public class HDFSTransactionStateStorage extends AbstractTransactionStateStorage {
+  private static final Logger LOG = LoggerFactory.getLogger(HDFSTransactionStateStorage.class);
+
+  private static final String SNAPSHOT_FILE_PREFIX = "snapshot.";
+  private static final String TMP_SNAPSHOT_FILE_PREFIX = ".in-progress.snapshot.";
+  private static final String LOG_FILE_PREFIX = "txlog.";
+
+  private static final PathFilter SNAPSHOT_FILE_FILTER = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      return path.getName().startsWith(SNAPSHOT_FILE_PREFIX);
+    }
+  };
+
+  // buffer size used for HDFS reads and writes
+  private static final int BUFFER_SIZE = 16384;
+
+  private final Configuration hConf;
+  private final String configuredSnapshotDir;
+  private final MetricsCollector metricsCollector;
+  private FileSystem fs;
+  private Path snapshotDir;
+
+  @Inject
+  public HDFSTransactionStateStorage(Configuration hConf, SnapshotCodecProvider codecProvider,
+                                     MetricsCollector metricsCollector) {
+    super(codecProvider);
+    this.hConf = hConf;
+    this.configuredSnapshotDir = hConf.get(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR);
+    this.metricsCollector = metricsCollector;
+  }
+
+  @Override
+  protected void startUp() throws Exception {
+    Preconditions.checkState(configuredSnapshotDir != null,
+        "Snapshot directory is not configured.  Please set " + TxConstants.Manager.CFG_TX_SNAPSHOT_DIR +
+        " in configuration.");
+    String hdfsUser = hConf.get(TxConstants.Manager.CFG_TX_HDFS_USER);
+    if (hdfsUser == null || UserGroupInformation.isSecurityEnabled()) {
+      if (hdfsUser != null && LOG.isDebugEnabled()) {
+        LOG.debug("Ignoring configuration {}={}, running on secure Hadoop",
+                  TxConstants.Manager.CFG_TX_HDFS_USER, hdfsUser);
+      }
+      // NOTE: we can start multiple times this storage. As hdfs uses per-jvm cache, we want to create new fs instead
+      //       of getting closed one
+      fs = FileSystem.newInstance(FileSystem.getDefaultUri(hConf), hConf);
+    } else {
+      fs = FileSystem.newInstance(FileSystem.getDefaultUri(hConf), hConf, hdfsUser);
+    }
+    snapshotDir = new Path(configuredSnapshotDir);
+    LOG.info("Using snapshot dir " + snapshotDir);
+  }
+
+  @Override
+  protected void shutDown() throws Exception {
+    fs.close();
+  }
+
+  @Override
+  public void writeSnapshot(TransactionSnapshot snapshot) throws IOException {
+    // create a temporary file, and save the snapshot
+    Path snapshotTmpFile = new Path(snapshotDir, TMP_SNAPSHOT_FILE_PREFIX + snapshot.getTimestamp());
+    LOG.debug("Writing snapshot to temporary file {}", snapshotTmpFile);
+
+    FSDataOutputStream out = fs.create(snapshotTmpFile, false, BUFFER_SIZE);
+    // encode the snapshot and stream the serialized version to the file
+    try {
+      codecProvider.encode(out, snapshot);
+    } finally {
+      out.close();
+    }
+
+    // move the temporary file into place with the correct filename
+    Path finalFile = new Path(snapshotDir, SNAPSHOT_FILE_PREFIX + snapshot.getTimestamp());
+    fs.rename(snapshotTmpFile, finalFile);
+    LOG.debug("Completed snapshot to file {}", finalFile);
+  }
+
+  @Override
+  public TransactionSnapshot getLatestSnapshot() throws IOException {
+    InputStream in = getLatestSnapshotInputStream();
+    if (in == null) {
+      return null;
+    }
+    try {
+      return readSnapshotInputStream(in);
+    } finally {
+      in.close();
+    }
+  }
+
+  @Override
+  public TransactionVisibilityState getLatestTransactionVisibilityState() throws IOException {
+    InputStream in = getLatestSnapshotInputStream();
+    if (in == null) {
+      return null;
+    }
+    try {
+      return readTransactionVisibilityStateFromInputStream(in);
+    } finally {
+      in.close();
+    }
+  }
+
+  private InputStream getLatestSnapshotInputStream() throws IOException {
+    TimestampedFilename[] snapshots = listSnapshotFiles();
+    Arrays.sort(snapshots);
+    if (snapshots.length > 0) {
+      // last is the most recent
+      return fs.open(snapshots[snapshots.length - 1].getPath(), BUFFER_SIZE);
+    }
+
+    LOG.info("No snapshot files found in {}", snapshotDir);
+    return null;
+  }
+
+  private TransactionSnapshot readSnapshotInputStream(InputStream in) throws IOException {
+    CountingInputStream countingIn = new CountingInputStream(in);
+    TransactionSnapshot snapshot = codecProvider.decode(countingIn);
+    LOG.info("Read encoded transaction snapshot of {} bytes", countingIn.getCount());
+    return snapshot;
+  }
+
+  private TransactionVisibilityState readTransactionVisibilityStateFromInputStream(InputStream in) throws IOException {
+    CountingInputStream countingIn = new CountingInputStream(in);
+    TransactionVisibilityState state = codecProvider.decodeTransactionVisibilityState(countingIn);
+    LOG.info("Read encoded transaction snapshot of {} bytes", countingIn.getCount());
+    return state;
+  }
+
+  private TransactionSnapshot readSnapshotFile(Path filePath) throws IOException {
+    FSDataInputStream in = fs.open(filePath, BUFFER_SIZE);
+    try {
+      return readSnapshotInputStream(in);
+    } finally {
+      in.close();
+    }
+  }
+
+  private TimestampedFilename[] listSnapshotFiles() throws IOException {
+    FileStatus[] snapshotFileStatuses = fs.listStatus(snapshotDir, SNAPSHOT_FILE_FILTER);
+    TimestampedFilename[] snapshotFiles = new TimestampedFilename[snapshotFileStatuses.length];
+    for (int i = 0; i < snapshotFileStatuses.length; i++) {
+      snapshotFiles[i] = new TimestampedFilename(snapshotFileStatuses[i].getPath());
+    }
+    return snapshotFiles;
+  }
+
+  @Override
+  public long deleteOldSnapshots(int numberToKeep) throws IOException {
+    TimestampedFilename[] snapshots = listSnapshotFiles();
+    if (snapshots.length == 0) {
+      return -1;
+    }
+    Arrays.sort(snapshots, Collections.reverseOrder());
+    if (snapshots.length <= numberToKeep) {
+      // nothing to remove, oldest timestamp is the last snapshot
+      return snapshots[snapshots.length - 1].getTimestamp();
+    }
+    int toRemoveCount = snapshots.length - numberToKeep;
+    TimestampedFilename[] toRemove = new TimestampedFilename[toRemoveCount];
+    System.arraycopy(snapshots, numberToKeep, toRemove, 0, toRemoveCount);
+
+    for (TimestampedFilename f : toRemove) {
+      LOG.debug("Removing old snapshot file {}", f.getPath());
+      fs.delete(f.getPath(), false);
+    }
+    long oldestTimestamp = snapshots[numberToKeep - 1].getTimestamp();
+    LOG.debug("Removed {} old snapshot files prior to {}", toRemoveCount, oldestTimestamp);
+    return oldestTimestamp;
+  }
+
+  @Override
+  public List<String> listSnapshots() throws IOException {
+    FileStatus[] files = fs.listStatus(snapshotDir, SNAPSHOT_FILE_FILTER);
+    return Lists.transform(Arrays.asList(files), new Function<FileStatus, String>() {
+      @Nullable
+      @Override
+      public String apply(@Nullable FileStatus input) {
+        return input.getPath().getName();
+      }
+    });
+  }
+
+  @Override
+  public List<TransactionLog> getLogsSince(long timestamp) throws IOException {
+    FileStatus[] statuses = fs.listStatus(snapshotDir, new LogFileFilter(timestamp, Long.MAX_VALUE));
+    TimestampedFilename[] timestampedFiles = new TimestampedFilename[statuses.length];
+    for (int i = 0; i < statuses.length; i++) {
+      timestampedFiles[i] = new TimestampedFilename(statuses[i].getPath());
+    }
+    return Lists.transform(Arrays.asList(timestampedFiles), new Function<TimestampedFilename, TransactionLog>() {
+      @Nullable
+      @Override
+      public TransactionLog apply(@Nullable TimestampedFilename input) {
+        return openLog(input.getPath(), input.getTimestamp());
+      }
+    });
+  }
+
+  @Override
+  public TransactionLog createLog(long timestamp) throws IOException {
+    Path newLog = new Path(snapshotDir, LOG_FILE_PREFIX + timestamp);
+    return openLog(newLog, timestamp);
+  }
+
+  private TransactionLog openLog(Path path, long timestamp) {
+    return new HDFSTransactionLog(fs, hConf, path, timestamp, metricsCollector);
+  }
+
+  @Override
+  public void deleteLogsOlderThan(long timestamp) throws IOException {
+    FileStatus[] statuses = fs.listStatus(snapshotDir, new LogFileFilter(0, timestamp));
+    int removedCnt = 0;
+    for (FileStatus status : statuses) {
+      LOG.debug("Removing old transaction log {}", status.getPath());
+      if (fs.delete(status.getPath(), false)) {
+        removedCnt++;
+      } else {
+        LOG.error("Failed to delete transaction log file {}", status.getPath());
+      }
+    }
+    LOG.debug("Removed {} transaction logs older than {}", removedCnt, timestamp);
+  }
+
+  @Override
+  public void setupStorage() throws IOException {
+    if (!fs.exists(snapshotDir)) {
+      LOG.info("Creating snapshot dir at {}", snapshotDir);
+      fs.mkdirs(snapshotDir);
+    } else {
+      Preconditions.checkState(fs.isDirectory(snapshotDir),
+                               "Configured snapshot directory " + snapshotDir + " is not a directory!");
+    }
+  }
+
+  @Override
+  public List<String> listLogs() throws IOException {
+    FileStatus[] files = fs.listStatus(snapshotDir, new LogFileFilter(0, Long.MAX_VALUE));
+    return Lists.transform(Arrays.asList(files), new Function<FileStatus, String>() {
+      @Nullable
+      @Override
+      public String apply(@Nullable FileStatus input) {
+        return input.getPath().getName();
+      }
+    });
+  }
+
+  @Override
+  public String getLocation() {
+    return snapshotDir.toString();
+  }
+
+  private static class LogFileFilter implements PathFilter {
+    // starting time of files to include (inclusive)
+    private final long startTime;
+    // ending time of files to include (exclusive)
+    private final long endTime;
+
+    public LogFileFilter(long startTime, long endTime) {
+      this.startTime = startTime;
+      this.endTime = endTime;
+    }
+
+    @Override
+    public boolean accept(Path path) {
+      if (path.getName().startsWith(LOG_FILE_PREFIX)) {
+        String[] parts = path.getName().split("\\.");
+        if (parts.length == 2) {
+          try {
+            long fileTime = Long.parseLong(parts[1]);
+            return fileTime >= startTime && fileTime < endTime;
+          } catch (NumberFormatException ignored) {
+            LOG.warn("Filename {} did not match the expected pattern prefix.<timestamp>", path.getName());
+          }
+        }
+      }
+      return false;
+    }
+  }
+
+  /**
+   * Represents a filename composed of a prefix and a ".timestamp" suffix.  This is useful for manipulating both
+   * snapshot and transaction log filenames.
+   */
+  private static class TimestampedFilename implements Comparable<TimestampedFilename> {
+    private Path path;
+    private String prefix;
+    private long timestamp;
+
+    public TimestampedFilename(Path path) {
+      this.path = path;
+      String[] parts = path.getName().split("\\.");
+      if (parts.length != 2) {
+        throw new IllegalArgumentException("Filename " + path.getName() +
+            " did not match the expected pattern prefix.timestamp");
+      }
+      prefix = parts[0];
+      timestamp = Long.parseLong(parts[1]);
+    }
+
+    public Path getPath() {
+      return path;
+    }
+
+    public String getPrefix() {
+      return prefix;
+    }
+
+    public long getTimestamp() {
+      return timestamp;
+    }
+
+    @Override
+    public int compareTo(TimestampedFilename other) {
+      int res = prefix.compareTo(other.getPrefix());
+      if (res == 0) {
+        res = Longs.compare(timestamp, other.getTimestamp());
+      }
+      return res;
+    }
+  }
+
+  // TODO move this out as a separate command line tool
+  private enum CLIMode { SNAPSHOT, TXLOG };
+  /**
+   * Reads a transaction state snapshot or transaction log from HDFS and prints the entries to stdout.
+   *
+   * Supports the following options:
+   *    -s    read snapshot state (defaults to the latest)
+   *    -l    read a transaction log
+   *    [filename]  reads the given file
+   * @param args
+   */
+  public static void main(String[] args) {
+    List<String> filenames = Lists.newArrayList();
+    CLIMode mode = null;
+    for (String arg : args) {
+      if ("-s".equals(arg)) {
+        mode = CLIMode.SNAPSHOT;
+      } else if ("-l".equals(arg)) {
+        mode = CLIMode.TXLOG;
+      } else if ("-h".equals(arg)) {
+        printUsage(null);
+      } else {
+        filenames.add(arg);
+      }
+    }
+
+    if (mode == null) {
+      printUsage("ERROR: Either -s or -l is required to set mode.", 1);
+    }
+
+    Configuration config = new ConfigurationFactory().get();
+
+    // Use the no-op metrics collector.  We are being run as a command line tool, so there are no relevant metrics
+    // to report
+    HDFSTransactionStateStorage storage =
+      new HDFSTransactionStateStorage(config, new SnapshotCodecProvider(config), new TxMetricsCollector());
+    storage.startAndWait();
+    try {
+      switch (mode) {
+        case SNAPSHOT:
+          try {
+            if (filenames.isEmpty()) {
+              TransactionSnapshot snapshot = storage.getLatestSnapshot();
+              printSnapshot(snapshot);
+            }
+            for (String file : filenames) {
+              Path path = new Path(file);
+              TransactionSnapshot snapshot = storage.readSnapshotFile(path);
+              printSnapshot(snapshot);
+              System.out.println();
+            }
+          } catch (IOException ioe) {
+            System.err.println("Error reading snapshot files: " + ioe.getMessage());
+            ioe.printStackTrace();
+            System.exit(1);
+          }
+          break;
+        case TXLOG:
+          if (filenames.isEmpty()) {
+            printUsage("ERROR: At least one transaction log filename is required!", 1);
+          }
+          for (String file : filenames) {
+            TimestampedFilename timestampedFilename = new TimestampedFilename(new Path(file));
+            TransactionLog log = storage.openLog(timestampedFilename.getPath(), timestampedFilename.getTimestamp());
+            printLog(log);
+            System.out.println();
+          }
+          break;
+      }
+    } finally {
+      storage.stop();
+    }
+  }
+
+  private static void printUsage(String message) {
+    printUsage(message, 0);
+  }
+
+  private static void printUsage(String message, int exitCode) {
+    if (message != null) {
+      System.out.println(message);
+    }
+    System.out.println("Usage: java " + HDFSTransactionStateStorage.class.getName() + " (-s|-l) file1 [file2...]");
+    System.out.println();
+    System.out.println("\t-s\tRead files as transaction state snapshots (will default to latest if no file given)");
+    System.out.println("\t-l\tRead files as transaction logs [filename is required]");
+    System.out.println("\t-h\tPrint this message");
+    System.exit(exitCode);
+  }
+
+  private static void printSnapshot(TransactionSnapshot snapshot) {
+    Date snapshotDate = new Date(snapshot.getTimestamp());
+    System.out.println("TransactionSnapshot at " + snapshotDate.toString());
+    System.out.println("\t" + snapshot.toString());
+  }
+
+  private static void printLog(TransactionLog log) {
+    try {
+      System.out.println("TransactionLog " + log.getName());
+      TransactionLogReader reader = log.getReader();
+      TransactionEdit edit;
+      long seq = 0;
+      while ((edit = reader.next()) != null) {
+        System.out.println(String.format("    %d: %s", seq++, edit.toString()));
+      }
+    } catch (IOException ioe) {
+      System.err.println("ERROR reading log " + log.getName() + ": " + ioe.getMessage());
+      ioe.printStackTrace();
+    }
+  }
+}