You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by po...@apache.org on 2016/05/11 20:15:44 UTC

[44/56] [abbrv] [partial] incubator-tephra git commit: Rename package to org.apache.tephra

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/distributed/thrift/TTransactionType.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/distributed/thrift/TTransactionType.java b/tephra-core/src/main/java/co/cask/tephra/distributed/thrift/TTransactionType.java
deleted file mode 100644
index 0d10eb3..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/distributed/thrift/TTransactionType.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Autogenerated by Thrift Compiler (0.9.0)
- *
- * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
- *  @generated
- */
-package co.cask.tephra.distributed.thrift;
-
-
-import java.util.Map;
-import java.util.HashMap;
-import org.apache.thrift.TEnum;
-
-public enum TTransactionType implements org.apache.thrift.TEnum {
-  SHORT(1),
-  LONG(2);
-
-  private final int value;
-
-  private TTransactionType(int value) {
-    this.value = value;
-  }
-
-  /**
-   * Get the integer value of this enum value, as defined in the Thrift IDL.
-   */
-  public int getValue() {
-    return value;
-  }
-
-  /**
-   * Find a the enum type by its integer value, as defined in the Thrift IDL.
-   * @return null if the value is not found.
-   */
-  public static TTransactionType findByValue(int value) { 
-    switch (value) {
-      case 1:
-        return SHORT;
-      case 2:
-        return LONG;
-      default:
-        return null;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/distributed/thrift/TVisibilityLevel.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/distributed/thrift/TVisibilityLevel.java b/tephra-core/src/main/java/co/cask/tephra/distributed/thrift/TVisibilityLevel.java
deleted file mode 100644
index 6de6f87..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/distributed/thrift/TVisibilityLevel.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Autogenerated by Thrift Compiler (0.9.0)
- *
- * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
- *  @generated
- */
-package co.cask.tephra.distributed.thrift;
-
-
-public enum TVisibilityLevel implements org.apache.thrift.TEnum {
-  SNAPSHOT(1),
-  SNAPSHOT_EXCLUDE_CURRENT(2),
-  SNAPSHOT_ALL(3);
-
-  private final int value;
-
-  private TVisibilityLevel(int value) {
-    this.value = value;
-  }
-
-  /**
-   * Get the integer value of this enum value, as defined in the Thrift IDL.
-   */
-  public int getValue() {
-    return value;
-  }
-
-  /**
-   * Find a the enum type by its integer value, as defined in the Thrift IDL.
-   * @return null if the value is not found.
-   */
-  public static TVisibilityLevel findByValue(int value) { 
-    switch (value) {
-      case 1:
-        return SNAPSHOT;
-      case 2:
-        return SNAPSHOT_EXCLUDE_CURRENT;
-      case 3:
-        return SNAPSHOT_ALL;
-      default:
-        return null;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/inmemory/DetachedTxSystemClient.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/inmemory/DetachedTxSystemClient.java b/tephra-core/src/main/java/co/cask/tephra/inmemory/DetachedTxSystemClient.java
deleted file mode 100644
index 1731b0c..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/inmemory/DetachedTxSystemClient.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.inmemory;
-
-import co.cask.tephra.InvalidTruncateTimeException;
-import co.cask.tephra.Transaction;
-import co.cask.tephra.TransactionCouldNotTakeSnapshotException;
-import co.cask.tephra.TransactionSystemClient;
-import co.cask.tephra.TransactionType;
-import co.cask.tephra.TxConstants;
-import it.unimi.dsi.fastutil.longs.LongArrayList;
-
-import java.io.InputStream;
-import java.util.Collection;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * Implementation of the tx system client that doesn't talk to any global service and tries to do its best to meet the
- * tx system requirements/expectations. In fact it implements enough logic to support running flows (when each flowlet
- * uses its own detached tx system client, without talking to each other and sharing any state) with "process exactly
- * once" guarantee if no failures happen.
- *
- * NOTE: Will NOT detect conflicts. May leave inconsistent state when process crashes. Does NOT provide even read
- *       isolation guarantees.
- *
- * Good for performance testing. For demoing high throughput. For use-cases with relaxed tx guarantees.
- */
-public class DetachedTxSystemClient implements TransactionSystemClient {
-  // Dataset and queue logic relies on tx id to grow monotonically even after restart. Hence we need to start with
-  // value that is for sure bigger than the last one used before restart.
-  // NOTE: with code below we assume we don't do more than InMemoryTransactionManager.MAX_TX_PER_MS tx/ms
-  //       by single client
-  private AtomicLong generator = new AtomicLong(System.currentTimeMillis() * TxConstants.MAX_TX_PER_MS);
-
-  @Override
-  public Transaction startShort() {
-    long wp = getWritePointer();
-    // NOTE: -1 here is because we have logic that uses (readpointer + 1) as a "exclusive stop key" in some datasets
-    return new Transaction(
-      Long.MAX_VALUE - 1, wp, new long[0], new long[0],
-      Transaction.NO_TX_IN_PROGRESS, TransactionType.SHORT);
-  }
-
-  private long getWritePointer() {
-    long wp = generator.incrementAndGet();
-    // NOTE: using InMemoryTransactionManager.MAX_TX_PER_MS to be at least close to real one
-    long now = System.currentTimeMillis();
-    if (wp < now * TxConstants.MAX_TX_PER_MS) {
-      // trying to advance to align with timestamp, but only once: if failed, we'll just try again later with next tx
-      long advanced = now * TxConstants.MAX_TX_PER_MS;
-      if (generator.compareAndSet(wp, advanced)) {
-        wp = advanced;
-      }
-    }
-    return wp;
-  }
-
-  @Override
-  public Transaction startShort(int timeout) {
-    return startShort();
-  }
-
-  @Override
-  public Transaction startLong() {
-    return startShort();
-  }
-
-  @Override
-  public boolean canCommit(Transaction tx, Collection<byte[]> changeIds) {
-    return true;
-  }
-
-  @Override
-  public boolean commit(Transaction tx) {
-    return true;
-  }
-
-  @Override
-  public void abort(Transaction tx) {
-    // do nothing
-  }
-
-  @Override
-  public boolean invalidate(long tx) {
-    return true;
-  }
-
-  @Override
-  public Transaction checkpoint(Transaction tx) {
-    long newWritePointer = getWritePointer();
-    LongArrayList newCheckpointPointers = new LongArrayList(tx.getCheckpointWritePointers());
-    newCheckpointPointers.add(newWritePointer);
-    return new Transaction(tx, newWritePointer, newCheckpointPointers.toLongArray());
-  }
-
-  @Override
-  public InputStream getSnapshotInputStream() throws TransactionCouldNotTakeSnapshotException {
-    throw new TransactionCouldNotTakeSnapshotException(
-        "Snapshot not implemented in detached transaction system client");
-  }
-
-  @Override
-  public String status() {
-    return TxConstants.STATUS_OK;
-  }
-
-  @Override
-  public void resetState() {
-    // do nothing
-  }
-
-  @Override
-  public boolean truncateInvalidTx(Set<Long> invalidTxIds) {
-    return true;
-  }
-
-  @Override
-  public boolean truncateInvalidTxBefore(long time) throws InvalidTruncateTimeException {
-    return true;
-  }
-
-  @Override
-  public int getInvalidSize() {
-    return 0;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/inmemory/InMemoryTransactionService.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/inmemory/InMemoryTransactionService.java b/tephra-core/src/main/java/co/cask/tephra/inmemory/InMemoryTransactionService.java
deleted file mode 100644
index feecdce..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/inmemory/InMemoryTransactionService.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package co.cask.tephra.inmemory;
-
-import co.cask.tephra.TransactionManager;
-import co.cask.tephra.TxConstants;
-import com.google.common.util.concurrent.AbstractService;
-import com.google.inject.Inject;
-import com.google.inject.Provider;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.twill.common.Cancellable;
-import org.apache.twill.discovery.Discoverable;
-import org.apache.twill.discovery.DiscoveryService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.InetSocketAddress;
-
-/**
- * Transaction server that manages transaction data for the Reactor.
- * <p>
- *   Transaction server is HA, one can start multiple instances, only one of which is active and will register itself in
- *   discovery service.
- * </p>
- */
-public class InMemoryTransactionService extends AbstractService {
-  private static final Logger LOG = LoggerFactory.getLogger(InMemoryTransactionService.class);
-
-  private final DiscoveryService discoveryService;
-  private final String serviceName;
-  protected final Provider<TransactionManager> txManagerProvider;
-  private Cancellable cancelDiscovery;
-  protected TransactionManager txManager;
-
-  // thrift server config
-  protected final String address;
-  protected final int port;
-  protected final int threads;
-  protected final int ioThreads;
-  protected final int maxReadBufferBytes;
-
-  @Inject
-  public InMemoryTransactionService(Configuration conf,
-                            DiscoveryService discoveryService,
-                            Provider<TransactionManager> txManagerProvider) {
-
-    this.discoveryService = discoveryService;
-    this.txManagerProvider = txManagerProvider;
-    this.serviceName = conf.get(TxConstants.Service.CFG_DATA_TX_DISCOVERY_SERVICE_NAME,
-                                TxConstants.Service.DEFAULT_DATA_TX_DISCOVERY_SERVICE_NAME);
-
-    address = conf.get(TxConstants.Service.CFG_DATA_TX_BIND_ADDRESS, TxConstants.Service.DEFAULT_DATA_TX_BIND_ADDRESS);
-    port = conf.getInt(TxConstants.Service.CFG_DATA_TX_BIND_PORT, TxConstants.Service.DEFAULT_DATA_TX_BIND_PORT);
-
-    // Retrieve the number of threads for the service
-    threads = conf.getInt(TxConstants.Service.CFG_DATA_TX_SERVER_THREADS,
-                          TxConstants.Service.DEFAULT_DATA_TX_SERVER_THREADS);
-    ioThreads = conf.getInt(TxConstants.Service.CFG_DATA_TX_SERVER_IO_THREADS,
-                            TxConstants.Service.DEFAULT_DATA_TX_SERVER_IO_THREADS);
-
-    maxReadBufferBytes = conf.getInt(TxConstants.Service.CFG_DATA_TX_THRIFT_MAX_READ_BUFFER,
-                                     TxConstants.Service.DEFAULT_DATA_TX_THRIFT_MAX_READ_BUFFER);
-
-    LOG.info("Configuring TransactionService" +
-               ", address: " + address +
-               ", port: " + port +
-               ", threads: " + threads +
-               ", io threads: " + ioThreads +
-               ", max read buffer (bytes): " + maxReadBufferBytes);
-  }
-
-  protected void undoRegister() {
-    if (cancelDiscovery != null) {
-      cancelDiscovery.cancel();
-    }
-  }
-
-  protected void doRegister() {
-    cancelDiscovery = discoveryService.register(new Discoverable() {
-      @Override
-      public String getName() {
-        return serviceName;
-      }
-
-      @Override
-      public InetSocketAddress getSocketAddress() {
-        return getAddress();
-      }
-    });
-  }
-
-  protected InetSocketAddress getAddress() {
-    return new InetSocketAddress(1);
-  }
-
-  @Override
-  protected void doStart() {
-    try {
-      txManager = txManagerProvider.get();
-      txManager.startAndWait();
-      doRegister();
-      LOG.info("Transaction Thrift service started successfully on " + getAddress());
-      notifyStarted();
-    } catch (Throwable t) {
-      LOG.info("Transaction Thrift service didn't start on " + getAddress());
-      notifyFailed(t);
-    }
-  }
-
-  @Override
-  protected void doStop() {
-    undoRegister();
-    txManager.stopAndWait();
-    notifyStopped();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/inmemory/InMemoryTxSystemClient.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/inmemory/InMemoryTxSystemClient.java b/tephra-core/src/main/java/co/cask/tephra/inmemory/InMemoryTxSystemClient.java
deleted file mode 100644
index ba15269..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/inmemory/InMemoryTxSystemClient.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.inmemory;
-
-import co.cask.tephra.InvalidTruncateTimeException;
-import co.cask.tephra.Transaction;
-import co.cask.tephra.TransactionCouldNotTakeSnapshotException;
-import co.cask.tephra.TransactionManager;
-import co.cask.tephra.TransactionNotInProgressException;
-import co.cask.tephra.TransactionSystemClient;
-import co.cask.tephra.TxConstants;
-import com.google.inject.Inject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Collection;
-import java.util.Set;
-
-/**
- *
- */
-public class InMemoryTxSystemClient implements TransactionSystemClient {
-
-  private static final Logger LOG = LoggerFactory.getLogger(InMemoryTxSystemClient.class);
-
-  TransactionManager txManager;
-
-  @Inject
-  public InMemoryTxSystemClient(TransactionManager txmgr) {
-    txManager = txmgr;
-  }
-
-  @Override
-  public Transaction startLong() {
-    return txManager.startLong();
-  }
-
-  @Override
-  public Transaction startShort() {
-    return txManager.startShort();
-  }
-
-  @Override
-  public Transaction startShort(int timeout) {
-    return txManager.startShort(timeout);
-  }
-
-  @Override
-  public boolean canCommit(Transaction tx, Collection<byte[]> changeIds) throws TransactionNotInProgressException {
-    return changeIds.isEmpty() || txManager.canCommit(tx, changeIds);
-  }
-
-  @Override
-  public boolean commit(Transaction tx) throws TransactionNotInProgressException {
-    return txManager.commit(tx);
-  }
-
-  @Override
-  public void abort(Transaction tx) {
-    txManager.abort(tx);
-  }
-
-  @Override
-  public boolean invalidate(long tx) {
-    return txManager.invalidate(tx);
-  }
-
-  @Override
-  public Transaction checkpoint(Transaction tx) throws TransactionNotInProgressException {
-    return txManager.checkpoint(tx);
-  }
-
-  @Override
-  public InputStream getSnapshotInputStream() throws TransactionCouldNotTakeSnapshotException {
-    try {
-      ByteArrayOutputStream out = new ByteArrayOutputStream();
-      try {
-        boolean snapshotTaken = txManager.takeSnapshot(out);
-        if (!snapshotTaken) {
-          throw new TransactionCouldNotTakeSnapshotException("Transaction manager did not take a snapshot.");
-        }
-      } finally {
-        out.close();
-      }
-      return new ByteArrayInputStream(out.toByteArray());
-    } catch (IOException e) {
-      LOG.error("Snapshot could not be taken", e);
-      throw new TransactionCouldNotTakeSnapshotException(e.getMessage());
-    }
-  }
-
-  @Override
-  public String status() {
-    return txManager.isRunning() ? TxConstants.STATUS_OK : TxConstants.STATUS_NOTOK;
-  }
-
-  @Override
-  public void resetState() {
-    txManager.resetState();
-  }
-
-  @Override
-  public boolean truncateInvalidTx(Set<Long> invalidTxIds) {
-    return txManager.truncateInvalidTx(invalidTxIds);
-  }
-
-  @Override
-  public boolean truncateInvalidTxBefore(long time) throws InvalidTruncateTimeException {
-    return txManager.truncateInvalidTxBefore(time);
-  }
-
-  @Override
-  public int getInvalidSize() {
-    return txManager.getInvalidSize();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/inmemory/MinimalTxSystemClient.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/inmemory/MinimalTxSystemClient.java b/tephra-core/src/main/java/co/cask/tephra/inmemory/MinimalTxSystemClient.java
deleted file mode 100644
index 6f83565..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/inmemory/MinimalTxSystemClient.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.inmemory;
-
-import co.cask.tephra.InvalidTruncateTimeException;
-import co.cask.tephra.Transaction;
-import co.cask.tephra.TransactionCouldNotTakeSnapshotException;
-import co.cask.tephra.TransactionNotInProgressException;
-import co.cask.tephra.TransactionSystemClient;
-import co.cask.tephra.TransactionType;
-import co.cask.tephra.TxConstants;
-
-import java.io.InputStream;
-import java.util.Collection;
-import java.util.Set;
-
-/**
- * Dummy implementation of TxSystemClient. May be useful for perf testing.
- */
-public class MinimalTxSystemClient implements TransactionSystemClient {
-  private long currentTxPointer = 1;
-
-  @Override
-  public Transaction startShort() {
-    long wp = currentTxPointer++;
-    // NOTE: -1 here is because we have logic that uses (readpointer + 1) as a "exclusive stop key" in some datasets
-    return new Transaction(
-      Long.MAX_VALUE - 1, wp, new long[0], new long[0],
-      Transaction.NO_TX_IN_PROGRESS, TransactionType.SHORT);
-  }
-
-  @Override
-  public Transaction startShort(int timeout) {
-    return startShort();
-  }
-
-  @Override
-  public Transaction startLong() {
-    return startShort();
-  }
-
-  @Override
-  public boolean canCommit(Transaction tx, Collection<byte[]> changeIds) {
-    return true;
-  }
-
-  @Override
-  public boolean commit(Transaction tx) {
-    return true;
-  }
-
-  @Override
-  public void abort(Transaction tx) {
-    // do nothing
-  }
-
-  @Override
-  public boolean invalidate(long tx) {
-    return true;
-  }
-
-  @Override
-  public Transaction checkpoint(Transaction tx) throws TransactionNotInProgressException {
-    return tx;
-  }
-
-  @Override
-  public InputStream getSnapshotInputStream() throws TransactionCouldNotTakeSnapshotException {
-    throw new TransactionCouldNotTakeSnapshotException("Not snapshot to take.");
-  }
-
-  @Override
-  public String status() {
-    return TxConstants.STATUS_OK;
-  }
-
-  @Override
-  public void resetState() {
-    // do nothing
-  }
-
-  @Override
-  public boolean truncateInvalidTx(Set<Long> invalidTxIds) {
-    return true;
-  }
-
-  @Override
-  public boolean truncateInvalidTxBefore(long time) throws InvalidTruncateTimeException {
-    return true;
-  }
-
-  @Override
-  public int getInvalidSize() {
-    return 0;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/inmemory/package-info.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/inmemory/package-info.java b/tephra-core/src/main/java/co/cask/tephra/inmemory/package-info.java
deleted file mode 100644
index b7a7c59..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/inmemory/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * This package contains in memory implementation of the transaction system v2.
- */
-package co.cask.tephra.inmemory;

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/metrics/DefaultMetricsCollector.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/metrics/DefaultMetricsCollector.java b/tephra-core/src/main/java/co/cask/tephra/metrics/DefaultMetricsCollector.java
deleted file mode 100644
index 8e33b4d..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/metrics/DefaultMetricsCollector.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.metrics;
-
-import co.cask.tephra.TxConstants;
-import com.codahale.metrics.Gauge;
-import com.codahale.metrics.JmxReporter;
-import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.ScheduledReporter;
-import com.codahale.metrics.Slf4jReporter;
-import com.google.common.collect.Maps;
-import org.apache.hadoop.conf.Configuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * Default metrics collector implementation using <a href="http://metrics.dropwizard.io">Yammer Metrics</a>.
- *
- * <p>The reporting frequency for this collector can be configured by setting the
- * {@code data.tx.metrics.period} configuration property to the reporting frequency in seconds.
- * </p>
- */
-public class DefaultMetricsCollector extends TxMetricsCollector {
-  private static final Logger LOG = LoggerFactory.getLogger(DefaultMetricsCollector.class);
-
-  private final MetricRegistry metrics = new MetricRegistry();
-  private JmxReporter jmxReporter;
-  private ScheduledReporter reporter;
-  private int reportPeriod;
-  private ConcurrentMap<String, AtomicLong> gauges = Maps.newConcurrentMap();
-
-  @Override
-  public void configure(Configuration conf) {
-    // initialize selected output reporter
-    reportPeriod = conf.getInt(TxConstants.Metrics.REPORT_PERIOD_KEY, TxConstants.Metrics.REPORT_PERIOD_DEFAULT);
-    LOG.info("Configured metrics report to emit every {} seconds", reportPeriod);
-    // TODO: reporters should be pluggable based on injection
-    jmxReporter = JmxReporter.forRegistry(metrics).build();
-    reporter = Slf4jReporter.forRegistry(metrics)
-                            .outputTo(LoggerFactory.getLogger("tephra-metrics"))
-                            .convertRatesTo(TimeUnit.SECONDS)
-                            .convertDurationsTo(TimeUnit.MILLISECONDS)
-                            .build();
-  }
-
-
-  @Override
-  public void gauge(String metricName, int value, String... tags) {
-    AtomicLong gauge = gauges.get(metricName);
-    if (gauge == null) {
-      final AtomicLong newValue = new AtomicLong();
-      if (gauges.putIfAbsent(metricName, newValue) == null) {
-        // first to set the value, need to register the metric
-        metrics.register(metricName, new Gauge<Long>() {
-          @Override
-          public Long getValue() {
-            return newValue.get();
-          }
-        });
-        gauge = newValue;
-      } else {
-        // someone else set it first
-        gauge = gauges.get(metricName);
-      }
-    }
-    gauge.set(value);
-  }
-
-  @Override
-  public void histogram(String metricName, int value) {
-    metrics.histogram(metricName).update(value);
-  }
-
-  @Override
-  public void rate(String metricName) {
-    metrics.meter(metricName).mark();
-  }
-
-  @Override
-  public void rate(String metricName, int count) {
-    metrics.meter(metricName).mark(count);
-  }
-
-  @Override
-  protected void startUp() throws Exception {
-    jmxReporter.start();
-    reporter.start(reportPeriod, TimeUnit.SECONDS);
-    LOG.info("Started metrics reporter");
-  }
-
-  @Override
-  protected void shutDown() throws Exception {
-    jmxReporter.stop();
-    reporter.stop();
-    LOG.info("Stopped metrics reporter");
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/metrics/MetricsCollector.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/metrics/MetricsCollector.java b/tephra-core/src/main/java/co/cask/tephra/metrics/MetricsCollector.java
deleted file mode 100644
index 3aae4e0..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/metrics/MetricsCollector.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.metrics;
-
-import com.google.common.util.concurrent.Service;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * Basic API for Tephra to support system metrics.
- */
-public interface MetricsCollector extends Service {
-  /**
-   * Report a metric as an absolute value.
-   */
-  void gauge(String metricName, int value, String... tags);
-
-  /**
-   * Report a metric as a count over a given time duration.  This method uses an implicit count of 1.
-   */
-  void rate(String metricName);
-
-  /**
-   * Report a metric as a count over a given time duration.
-   */
-  void rate(String metricName, int count);
-
-  /**
-   * Report a metric calculating the distribution of the value.
-   */
-  void histogram(String metricName, int value);
-
-  /**
-   * Called before the collector service is started, allowing the collector to setup any
-   * required configuration.
-   */
-  void configure(Configuration conf);
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/metrics/TxMetricsCollector.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/metrics/TxMetricsCollector.java b/tephra-core/src/main/java/co/cask/tephra/metrics/TxMetricsCollector.java
deleted file mode 100644
index 7fba8e5..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/metrics/TxMetricsCollector.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.metrics;
-
-import com.google.common.util.concurrent.AbstractIdleService;
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * Metrics Collector Class, to emit Transaction Related Metrics.
- * Note: This default implementation is a no-op and doesn't emit any metrics
- */
-public class TxMetricsCollector extends AbstractIdleService implements MetricsCollector {
-
-  @Override
-  public void gauge(String metricName, int value, String... tags) {
-    //no-op
-  }
-
-  @Override
-  public void rate(String metricName) {
-    // no-op
-  }
-
-  @Override
-  public void rate(String metricName, int count) {
-    // no-op
-  }
-
-  @Override
-  public void histogram(String metricName, int value) {
-    // no-op
-  }
-
-  @Override
-  public void configure(Configuration conf) {
-    // no-op
-  }
-
-  /* Service methods */
-
-  @Override
-  protected void startUp() throws Exception {
-    // no-op
-  }
-
-  @Override
-  protected void shutDown() throws Exception {
-    // no-op
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/package-info.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/package-info.java b/tephra-core/src/main/java/co/cask/tephra/package-info.java
deleted file mode 100644
index 6f2f858..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * This package contains implementations of the transaction system v2.
- */
-package co.cask.tephra;

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/persist/AbstractTransactionLog.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/AbstractTransactionLog.java b/tephra-core/src/main/java/co/cask/tephra/persist/AbstractTransactionLog.java
deleted file mode 100644
index 173cc9f..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/persist/AbstractTransactionLog.java
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.persist;
-
-import co.cask.tephra.metrics.MetricsCollector;
-import com.google.common.collect.Lists;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * Common implementation of a transaction log, backed by file reader and writer based storage.  Classes extending
- * this class, must also implement {@link TransactionLogWriter} and {@link TransactionLogReader}.
- */
-public abstract class AbstractTransactionLog implements TransactionLog {
-  /** Time limit, in milliseconds, of an append to the transaction log before we log it as "slow". */
-  private static final long SLOW_APPEND_THRESHOLD = 1000L;
-
-  private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionLog.class);
-
-  private final AtomicLong logSequence = new AtomicLong();
-  private final MetricsCollector metricsCollector;
-  protected long timestamp;
-  private volatile boolean initialized;
-  private volatile boolean closed;
-  private AtomicLong syncedUpTo = new AtomicLong();
-  private List<Entry> pendingWrites = Lists.newLinkedList();
-  private TransactionLogWriter writer;
-
-  public AbstractTransactionLog(long timestamp, MetricsCollector metricsCollector) {
-    this.timestamp = timestamp;
-    this.metricsCollector = metricsCollector;
-  }
-
-  /**
-   * Initializes the log file, opening a file writer.  Clients calling {@code init()} should ensure that they
-   * also call {@link HDFSTransactionLog#close()}.
-   * @throws java.io.IOException If an error is encountered initializing the file writer.
-   */
-  public synchronized void init() throws IOException {
-    if (initialized) {
-      return;
-    }
-    this.writer = createWriter();
-    this.initialized = true;
-  }
-
-  /**
-   * Returns a log writer to be used for appending any new {@link TransactionEdit} objects.
-   */
-  protected abstract TransactionLogWriter createWriter() throws IOException;
-
-  @Override
-  public abstract String getName();
-
-  @Override
-  public long getTimestamp() {
-    return timestamp;
-  }
-
-  @Override
-  public void append(TransactionEdit edit) throws IOException {
-    long startTime = System.nanoTime();
-    synchronized (this) {
-      ensureAvailable();
-
-      Entry entry = new Entry(new LongWritable(logSequence.getAndIncrement()), edit);
-
-      // add to pending edits
-      append(entry);
-    }
-
-    // wait for sync to complete
-    sync();
-    long durationMillis = (System.nanoTime() - startTime) / 1000000L;
-    if (durationMillis > SLOW_APPEND_THRESHOLD) {
-      LOG.info("Slow append to log " + getName() + ", took " + durationMillis + " msec.");
-    }
-  }
-
-  @Override
-  public void append(List<TransactionEdit> edits) throws IOException {
-    long startTime = System.nanoTime();
-    synchronized (this) {
-      ensureAvailable();
-
-      for (TransactionEdit edit : edits) {
-        Entry entry = new Entry(new LongWritable(logSequence.getAndIncrement()), edit);
-
-        // add to pending edits
-        append(entry);
-      }
-    }
-
-    // wait for sync to complete
-    sync();
-    long durationMillis = (System.nanoTime() - startTime) / 1000000L;
-    if (durationMillis > SLOW_APPEND_THRESHOLD) {
-      LOG.info("Slow append to log " + getName() + ", took " + durationMillis + " msec.");
-    }
-  }
-
-  private void ensureAvailable() throws IOException {
-    if (closed) {
-      throw new IOException("Log " + getName() + " is already closed, cannot append!");
-    }
-    if (!initialized) {
-      init();
-    }
-  }
-
-  /*
-   * Appends new writes to the pendingWrites. It is better to keep it in
-   * our own queue rather than writing it to the HDFS output stream because
-   * HDFSOutputStream.writeChunk is not lightweight at all.
-   */
-  private void append(Entry e) throws IOException {
-    pendingWrites.add(e);
-  }
-
-  // Returns all currently pending writes. New writes
-  // will accumulate in a new list.
-  private List<Entry> getPendingWrites() {
-    synchronized (this) {
-      List<Entry> save = this.pendingWrites;
-      this.pendingWrites = new LinkedList<>();
-      return save;
-    }
-  }
-
-  private void sync() throws IOException {
-    // writes out pending entries to the HLog
-    TransactionLogWriter tmpWriter = null;
-    long latestSeq = 0;
-    int entryCount = 0;
-    synchronized (this) {
-      if (closed) {
-        return;
-      }
-      // prevent writer being dereferenced
-      tmpWriter = writer;
-
-      List<Entry> currentPending = getPendingWrites();
-      if (!currentPending.isEmpty()) {
-        tmpWriter.commitMarker(currentPending.size());
-      }
-
-      // write out all accumulated entries to log.
-      for (Entry e : currentPending) {
-        tmpWriter.append(e);
-        entryCount++;
-        latestSeq = Math.max(latestSeq, e.getKey().get());
-      }
-    }
-
-    long lastSynced = syncedUpTo.get();
-    // someone else might have already synced our edits, avoid double syncing
-    if (lastSynced < latestSeq) {
-      tmpWriter.sync();
-      metricsCollector.histogram("wal.sync.size", entryCount);
-      syncedUpTo.compareAndSet(lastSynced, latestSeq);
-    }
-  }
-
-  @Override
-  public synchronized void close() throws IOException {
-    if (closed) {
-      return;
-    }
-    // perform a final sync if any outstanding writes
-    if (!pendingWrites.isEmpty()) {
-      sync();
-    }
-    // NOTE: writer is lazy-inited, so it can be null
-    if (writer != null) {
-      this.writer.close();
-    }
-    this.closed = true;
-  }
-
-  public boolean isClosed() {
-    return closed;
-  }
-
-  @Override
-  public abstract TransactionLogReader getReader() throws IOException;
-
-  /**
-   * Represents an entry in the transaction log.  Each entry consists of a key, generated from an incrementing sequence
-   * number, and a value, the {@link TransactionEdit} being stored.
-   */
-  public static class Entry implements Writable {
-    private LongWritable key;
-    private TransactionEdit edit;
-
-    // for Writable
-    public Entry() {
-      this.key = new LongWritable();
-      this.edit = new TransactionEdit();
-    }
-
-    public Entry(LongWritable key, TransactionEdit edit) {
-      this.key = key;
-      this.edit = edit;
-    }
-
-    public LongWritable getKey() {
-      return this.key;
-    }
-
-    public TransactionEdit getEdit() {
-      return this.edit;
-    }
-
-    @Override
-    public void write(DataOutput out) throws IOException {
-      this.key.write(out);
-      this.edit.write(out);
-    }
-
-    @Override
-    public void readFields(DataInput in) throws IOException {
-      this.key.readFields(in);
-      this.edit.readFields(in);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/persist/AbstractTransactionStateStorage.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/AbstractTransactionStateStorage.java b/tephra-core/src/main/java/co/cask/tephra/persist/AbstractTransactionStateStorage.java
deleted file mode 100644
index 682435e..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/persist/AbstractTransactionStateStorage.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.persist;
-
-import co.cask.tephra.snapshot.SnapshotCodecProvider;
-import com.google.common.util.concurrent.AbstractIdleService;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-/**
- * Common base class for all transaction storage implementations. This implement logic to prefix a snapshot
- * with a version when encoding, and to select the correct codec for decoding based on this version prefix.
- */
-public abstract class AbstractTransactionStateStorage extends AbstractIdleService implements TransactionStateStorage {
-
-  protected final SnapshotCodecProvider codecProvider;
-
-  protected AbstractTransactionStateStorage(SnapshotCodecProvider codecProvider) {
-    this.codecProvider = codecProvider;
-  }
-
-  @Override
-  public void writeSnapshot(OutputStream out, TransactionSnapshot snapshot) throws IOException {
-    codecProvider.encode(out, snapshot);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/persist/CommitMarkerCodec.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/CommitMarkerCodec.java b/tephra-core/src/main/java/co/cask/tephra/persist/CommitMarkerCodec.java
deleted file mode 100644
index c1796bd..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/persist/CommitMarkerCodec.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.persist;
-
-import co.cask.tephra.TxConstants;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Charsets;
-import com.google.common.primitives.Bytes;
-import com.google.common.primitives.Ints;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.SequenceFile;
-
-import java.io.Closeable;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-/**
- * Class to read and write commit markers used in {@link HDFSTransactionLogReaderV2} and above.
- */
-public class CommitMarkerCodec implements Closeable {
-  private static final byte[] KEY_BYTES = TxConstants.TransactionLog.NUM_ENTRIES_APPENDED.getBytes(Charsets.UTF_8);
-  private final DataOutputBuffer rawKey;
-  private final DataOutputBuffer rawValue;
-  private SequenceFile.ValueBytes valueBytes;
-
-  public CommitMarkerCodec() {
-    this.rawKey = new DataOutputBuffer();
-    this.rawValue = new DataOutputBuffer();
-  }
-
-  @Override
-  public void close() throws IOException {
-    rawKey.close();
-    rawValue.close();
-  }
-
-  // 1. Returns the count when the marker is written correctly
-  // 2. If data is incorrect (for ex, incorrect key, mismatch in key/value/record length), we throw IOException
-  // since this indicates corrupted log file
-  // 3. If data is incomplete, then we throw EOFException which is handled gracefully by the calling method
-  // since we can recover without any consequence
-  public int readMarker(SequenceFile.Reader reader) throws IOException {
-    if (valueBytes == null) {
-      valueBytes = reader.createValueBytes();
-    }
-    rawKey.reset();
-    rawValue.reset();
-
-    // valueBytes need not be reset since nextRaw call does it (and it is a private method)
-    int status = reader.nextRaw(rawKey, valueBytes);
-
-    // if we reach EOF, return -1
-    if (status == -1) {
-      return -1;
-    }
-
-    // Check if the marker key is valid and return the count
-    if (isMarkerValid()) {
-      valueBytes.writeUncompressedBytes(rawValue);
-      rawValue.flush();
-      // rawValue.getData() may return a larger byte array but Ints.fromByteArray will only read the first four bytes
-      return Ints.fromByteArray(rawValue.getData());
-    }
-
-    // EOF not reached and marker is not valid, then thrown an IOException since we can't make progress
-    throw new IOException(String.format("Invalid key for num entries appended found %s, expected : %s",
-                                        new String(rawKey.getData()), TxConstants.TransactionLog.NUM_ENTRIES_APPENDED));
-  }
-
-  private boolean isMarkerValid() {
-    // rawKey should have the expected length and the matching bytes should start at index 0
-    return rawKey.getLength() == KEY_BYTES.length && Bytes.indexOf(rawKey.getData(), KEY_BYTES) == 0;
-  }
-
-  public static void writeMarker(SequenceFile.Writer writer, int count) throws IOException {
-    writer.appendRaw(KEY_BYTES, 0, KEY_BYTES.length, new CommitEntriesCount(count));
-  }
-
-  @VisibleForTesting
-  static final class CommitEntriesCount implements SequenceFile.ValueBytes {
-    private final int numEntries;
-
-    public CommitEntriesCount(int numEntries) {
-      this.numEntries = numEntries;
-    }
-
-    @Override
-    public void writeUncompressedBytes(DataOutputStream outStream) throws IOException {
-      outStream.write(Ints.toByteArray(numEntries));
-    }
-
-    @Override
-    public void writeCompressedBytes(DataOutputStream outStream) throws IllegalArgumentException, IOException {
-      throw new IllegalArgumentException("Commit Entries count writing is not expected to be compressed.");
-    }
-
-    @Override
-    public int getSize() {
-      return Ints.BYTES;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionLog.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionLog.java b/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionLog.java
deleted file mode 100644
index bed90c2..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionLog.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.persist;
-
-import co.cask.tephra.TxConstants;
-import co.cask.tephra.metrics.MetricsCollector;
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.EOFException;
-import java.io.IOException;
-
-/**
- * Allows reading from and writing to a transaction write-ahead log stored in HDFS.
- */
-public class HDFSTransactionLog extends AbstractTransactionLog {
-  private static final Logger LOG = LoggerFactory.getLogger(HDFSTransactionLog.class);
-
-  private final FileSystem fs;
-  private final Configuration hConf;
-  private final Path logPath;
-
-  /**
-   * Creates a new HDFS-backed write-ahead log for storing transaction state.
-   * @param fs Open FileSystem instance for opening log files in HDFS.
-   * @param hConf HDFS cluster configuration.
-   * @param logPath Path to the log file.
-   */
-  public HDFSTransactionLog(final FileSystem fs, final Configuration hConf,
-                            final Path logPath, long timestamp, MetricsCollector metricsCollector) {
-    super(timestamp, metricsCollector);
-    this.fs = fs;
-    this.hConf = hConf;
-    this.logPath = logPath;
-  }
-
-  @Override
-  protected TransactionLogWriter createWriter() throws IOException {
-    return new LogWriter(fs, hConf, logPath);
-  }
-
-  @Override
-  public String getName() {
-    return logPath.getName();
-  }
-
-  @Override
-  public TransactionLogReader getReader() throws IOException {
-    FileStatus status = fs.getFileStatus(logPath);
-    long length = status.getLen();
-
-    TransactionLogReader reader = null;
-    // check if this file needs to be recovered due to failure
-    // Check for possibly empty file. With appends, currently Hadoop reports a
-    // zero length even if the file has been sync'd. Revisit if HDFS-376 or
-    // HDFS-878 is committed.
-    if (length <= 0) {
-      LOG.warn("File " + logPath + " might be still open, length is 0");
-    }
-
-    try {
-      HDFSUtil hdfsUtil = new HDFSUtil();
-      hdfsUtil.recoverFileLease(fs, logPath, hConf);
-      try {
-        FileStatus newStatus = fs.getFileStatus(logPath);
-        LOG.info("New file size for " + logPath + " is " + newStatus.getLen());
-        SequenceFile.Reader fileReader = new SequenceFile.Reader(fs, logPath, hConf);
-        reader = new HDFSTransactionLogReaderSupplier(fileReader).get();
-      } catch (EOFException e) {
-        if (length <= 0) {
-          // TODO should we ignore an empty, not-last log file if skip.errors
-          // is false? Either way, the caller should decide what to do. E.g.
-          // ignore if this is the last log in sequence.
-          // TODO is this scenario still possible if the log has been
-          // recovered (i.e. closed)
-          LOG.warn("Could not open " + logPath + " for reading. File is empty", e);
-          return null;
-        } else {
-          // EOFException being ignored
-          return null;
-        }
-      }
-    } catch (IOException e) {
-      throw e;
-    }
-    return reader;
-  }
-
-  @VisibleForTesting
-  static final class LogWriter implements TransactionLogWriter {
-    private final SequenceFile.Writer internalWriter;
-    public LogWriter(FileSystem fs, Configuration hConf, Path logPath) throws IOException {
-      // TODO: retry a few times to ride over transient failures?
-      SequenceFile.Metadata metadata = new SequenceFile.Metadata();
-      metadata.set(new Text(TxConstants.TransactionLog.VERSION_KEY),
-                   new Text(Byte.toString(TxConstants.TransactionLog.CURRENT_VERSION)));
-
-      this.internalWriter = SequenceFile.createWriter(fs, hConf, logPath, LongWritable.class, TransactionEdit.class,
-                                                      SequenceFile.CompressionType.NONE, null, null, metadata);
-      LOG.debug("Created a new TransactionLog writer for " + logPath);
-    }
-
-    @Override
-    public void append(Entry entry) throws IOException {
-      internalWriter.append(entry.getKey(), entry.getEdit());
-    }
-
-    @Override
-    public void commitMarker(int count) throws IOException {
-      CommitMarkerCodec.writeMarker(internalWriter, count);
-    }
-
-    @Override
-    public void sync() throws IOException {
-      internalWriter.syncFs();
-    }
-
-    @Override
-    public void close() throws IOException {
-      internalWriter.close();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionLogReaderSupplier.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionLogReaderSupplier.java b/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionLogReaderSupplier.java
deleted file mode 100644
index c407945..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionLogReaderSupplier.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.persist;
-
-import co.cask.tephra.TxConstants;
-import com.google.common.base.Supplier;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-
-/**
- * Provides the correct version of {@link TransactionLogReader}, based on the log's version metadata,
- * to read HDFS Transaction Logs.
- */
-public class HDFSTransactionLogReaderSupplier implements Supplier<TransactionLogReader> {
-  private final SequenceFile.Reader reader;
-  private final byte version;
-  private TransactionLogReader logReader;
-
-  public HDFSTransactionLogReaderSupplier(SequenceFile.Reader reader) {
-    this.reader = reader;
-    Text versionInfo = reader.getMetadata().get(new Text(TxConstants.TransactionLog.VERSION_KEY));
-    this.version = versionInfo == null ? 1 : Byte.parseByte(versionInfo.toString());
-  }
-
-  @Override
-  public TransactionLogReader get() {
-    if (logReader != null) {
-      return logReader;
-    }
-
-    switch (version) {
-      case 2:
-        logReader = new HDFSTransactionLogReaderV2(reader);
-        return logReader;
-      case 1:
-        logReader = new HDFSTransactionLogReaderV1(reader);
-        return logReader;
-      default:
-        throw new IllegalArgumentException(String.format("Invalid version %s found in the Transaction Log", version));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionLogReaderV1.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionLogReaderV1.java b/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionLogReaderV1.java
deleted file mode 100644
index cb2ce7c..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionLogReaderV1.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.persist;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.EOFException;
-import java.io.IOException;
-
-/**
- * {@link TransactionLogReader} that can read v1 (default) version of Transaction logs. The logs are expected to
- * have a sequence of {@link TransactionEdit}s.
- */
-public class HDFSTransactionLogReaderV1 implements TransactionLogReader {
-  private static final Logger LOG = LoggerFactory.getLogger(HDFSTransactionLogReaderV1.class);
-  private final SequenceFile.Reader reader;
-  private final LongWritable key;
-  private boolean closed;
-
-  public HDFSTransactionLogReaderV1(SequenceFile.Reader reader) {
-    this.reader = reader;
-    this.key = new LongWritable();
-  }
-
-  @Override
-  public TransactionEdit next() throws IOException {
-    return next(new TransactionEdit());
-  }
-
-  @Override
-  public TransactionEdit next(TransactionEdit reuse) throws IOException {
-    if (closed) {
-      return null;
-    }
-
-    try {
-      boolean successful = reader.next(key, reuse);
-      return successful ? reuse : null;
-    } catch (EOFException e) {
-      LOG.warn("Hit an unexpected EOF while trying to read the Transaction Edit. Skipping the entry.", e);
-      return null;
-    }
-  }
-
-  @Override
-  public void close() throws IOException {
-    if (closed) {
-      return;
-    }
-    reader.close();
-    closed = true;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionLogReaderV2.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionLogReaderV2.java b/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionLogReaderV2.java
deleted file mode 100644
index 6981a3b..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionLogReaderV2.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.persist;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.Queue;
-
-/**
- * {@link TransactionLogReader} that can read v2 version of Transaction logs. The logs are expected to
- * have commit markers that indicates the size of the batch of {@link TransactionEdit}s (follows the marker),
- * that were synced together. If the expected number of {@link TransactionEdit}s are not present then that set of
- * {@link TransactionEdit}s are discarded.
- */
-public class HDFSTransactionLogReaderV2 implements TransactionLogReader {
-  private static final Logger LOG = LoggerFactory.getLogger(HDFSTransactionLogReaderV2.class);
-
-  private final SequenceFile.Reader reader;
-  private final Queue<TransactionEdit> transactionEdits;
-  private final CommitMarkerCodec commitMarkerCodec;
-  private final LongWritable key;
-
-  private boolean closed;
-
-  public HDFSTransactionLogReaderV2(SequenceFile.Reader reader) {
-    this.reader = reader;
-    this.transactionEdits = new ArrayDeque<>();
-    this.key = new LongWritable();
-    this.commitMarkerCodec = new CommitMarkerCodec();
-  }
-
-  @Override
-  public void close() throws IOException {
-    if (closed) {
-      return;
-    }
-    try {
-      commitMarkerCodec.close();
-    } finally {
-      reader.close();
-      closed = true;
-    }
-  }
-
-  @Override
-  public TransactionEdit next() throws IOException {
-    return next(null);
-  }
-
-  @Override
-  public TransactionEdit next(TransactionEdit reuse) throws IOException {
-    if (closed) {
-      return null;
-    }
-
-    if (!transactionEdits.isEmpty()) {
-      return transactionEdits.remove();
-    }
-
-    // Fetch the 'marker' and read 'marker' number of edits
-    populateTransactionEdits();
-    return transactionEdits.poll();
-  }
-
-  private void populateTransactionEdits() throws IOException {
-    // read the marker to determine numEntries to read.
-    int numEntries = 0;
-    try {
-      // can throw EOFException if reading of incomplete commit marker, no other action required since we can safely
-      // ignore this
-      numEntries = commitMarkerCodec.readMarker(reader);
-    } catch (EOFException e) {
-      LOG.warn("Reached EOF in log while trying to read commit marker", e);
-    }
-
-    for (int i = 0; i < numEntries; i++) {
-      TransactionEdit edit = new TransactionEdit();
-      try {
-        if (reader.next(key, edit)) {
-          transactionEdits.add(edit);
-        } else {
-          throw new EOFException("Attempt to read TransactionEdit failed.");
-        }
-      } catch (EOFException e) {
-        // we have reached EOF before reading back numEntries, we clear the partial list and return.
-        LOG.warn("Reached EOF in log before reading {} entries. Ignoring all {} edits since the last marker",
-                 numEntries, transactionEdits.size(), e);
-        transactionEdits.clear();
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionStateStorage.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionStateStorage.java b/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionStateStorage.java
deleted file mode 100644
index bc7fb9f..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionStateStorage.java
+++ /dev/null
@@ -1,492 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.persist;
-
-import co.cask.tephra.TxConstants;
-import co.cask.tephra.metrics.MetricsCollector;
-import co.cask.tephra.metrics.TxMetricsCollector;
-import co.cask.tephra.snapshot.SnapshotCodecProvider;
-import co.cask.tephra.util.ConfigurationFactory;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.io.CountingInputStream;
-import com.google.common.primitives.Longs;
-import com.google.inject.Inject;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Date;
-import java.util.List;
-import javax.annotation.Nullable;
-
-/**
- * Handles persistence of transaction snapshot and logs to a directory in HDFS.
- *
- * The directory used for file storage is configured using the {@code data.tx.snapshot.dir} configuration property.
- * Both snapshot and transaction log files are suffixed with a timestamp to allow easy ordering.  Snapshot files
- * are written with the filename "snapshot.&lt;timestamp&gt;".  Transaction log files are written with the filename
- * "txlog.&lt;timestamp&gt;".
- */
-public class HDFSTransactionStateStorage extends AbstractTransactionStateStorage {
-  private static final Logger LOG = LoggerFactory.getLogger(HDFSTransactionStateStorage.class);
-
-  private static final String SNAPSHOT_FILE_PREFIX = "snapshot.";
-  private static final String TMP_SNAPSHOT_FILE_PREFIX = ".in-progress.snapshot.";
-  private static final String LOG_FILE_PREFIX = "txlog.";
-
-  private static final PathFilter SNAPSHOT_FILE_FILTER = new PathFilter() {
-    @Override
-    public boolean accept(Path path) {
-      return path.getName().startsWith(SNAPSHOT_FILE_PREFIX);
-    }
-  };
-
-  // buffer size used for HDFS reads and writes
-  private static final int BUFFER_SIZE = 16384;
-
-  private final Configuration hConf;
-  private final String configuredSnapshotDir;
-  private final MetricsCollector metricsCollector;
-  private FileSystem fs;
-  private Path snapshotDir;
-
-  @Inject
-  public HDFSTransactionStateStorage(Configuration hConf, SnapshotCodecProvider codecProvider,
-                                     MetricsCollector metricsCollector) {
-    super(codecProvider);
-    this.hConf = hConf;
-    this.configuredSnapshotDir = hConf.get(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR);
-    this.metricsCollector = metricsCollector;
-  }
-
-  @Override
-  protected void startUp() throws Exception {
-    Preconditions.checkState(configuredSnapshotDir != null,
-        "Snapshot directory is not configured.  Please set " + TxConstants.Manager.CFG_TX_SNAPSHOT_DIR +
-        " in configuration.");
-    String hdfsUser = hConf.get(TxConstants.Manager.CFG_TX_HDFS_USER);
-    if (hdfsUser == null || UserGroupInformation.isSecurityEnabled()) {
-      if (hdfsUser != null && LOG.isDebugEnabled()) {
-        LOG.debug("Ignoring configuration {}={}, running on secure Hadoop",
-                  TxConstants.Manager.CFG_TX_HDFS_USER, hdfsUser);
-      }
-      // NOTE: we can start multiple times this storage. As hdfs uses per-jvm cache, we want to create new fs instead
-      //       of getting closed one
-      fs = FileSystem.newInstance(FileSystem.getDefaultUri(hConf), hConf);
-    } else {
-      fs = FileSystem.newInstance(FileSystem.getDefaultUri(hConf), hConf, hdfsUser);
-    }
-    snapshotDir = new Path(configuredSnapshotDir);
-    LOG.info("Using snapshot dir " + snapshotDir);
-  }
-
-  @Override
-  protected void shutDown() throws Exception {
-    fs.close();
-  }
-
-  @Override
-  public void writeSnapshot(TransactionSnapshot snapshot) throws IOException {
-    // create a temporary file, and save the snapshot
-    Path snapshotTmpFile = new Path(snapshotDir, TMP_SNAPSHOT_FILE_PREFIX + snapshot.getTimestamp());
-    LOG.debug("Writing snapshot to temporary file {}", snapshotTmpFile);
-
-    FSDataOutputStream out = fs.create(snapshotTmpFile, false, BUFFER_SIZE);
-    // encode the snapshot and stream the serialized version to the file
-    try {
-      codecProvider.encode(out, snapshot);
-    } finally {
-      out.close();
-    }
-
-    // move the temporary file into place with the correct filename
-    Path finalFile = new Path(snapshotDir, SNAPSHOT_FILE_PREFIX + snapshot.getTimestamp());
-    fs.rename(snapshotTmpFile, finalFile);
-    LOG.debug("Completed snapshot to file {}", finalFile);
-  }
-
-  @Override
-  public TransactionSnapshot getLatestSnapshot() throws IOException {
-    InputStream in = getLatestSnapshotInputStream();
-    if (in == null) {
-      return null;
-    }
-    try {
-      return readSnapshotInputStream(in);
-    } finally {
-      in.close();
-    }
-  }
-
-  @Override
-  public TransactionVisibilityState getLatestTransactionVisibilityState() throws IOException {
-    InputStream in = getLatestSnapshotInputStream();
-    if (in == null) {
-      return null;
-    }
-    try {
-      return readTransactionVisibilityStateFromInputStream(in);
-    } finally {
-      in.close();
-    }
-  }
-
-  private InputStream getLatestSnapshotInputStream() throws IOException {
-    TimestampedFilename[] snapshots = listSnapshotFiles();
-    Arrays.sort(snapshots);
-    if (snapshots.length > 0) {
-      // last is the most recent
-      return fs.open(snapshots[snapshots.length - 1].getPath(), BUFFER_SIZE);
-    }
-
-    LOG.info("No snapshot files found in {}", snapshotDir);
-    return null;
-  }
-
-  private TransactionSnapshot readSnapshotInputStream(InputStream in) throws IOException {
-    CountingInputStream countingIn = new CountingInputStream(in);
-    TransactionSnapshot snapshot = codecProvider.decode(countingIn);
-    LOG.info("Read encoded transaction snapshot of {} bytes", countingIn.getCount());
-    return snapshot;
-  }
-
-  private TransactionVisibilityState readTransactionVisibilityStateFromInputStream(InputStream in) throws IOException {
-    CountingInputStream countingIn = new CountingInputStream(in);
-    TransactionVisibilityState state = codecProvider.decodeTransactionVisibilityState(countingIn);
-    LOG.info("Read encoded transaction snapshot of {} bytes", countingIn.getCount());
-    return state;
-  }
-
-  private TransactionSnapshot readSnapshotFile(Path filePath) throws IOException {
-    FSDataInputStream in = fs.open(filePath, BUFFER_SIZE);
-    try {
-      return readSnapshotInputStream(in);
-    } finally {
-      in.close();
-    }
-  }
-
-  private TimestampedFilename[] listSnapshotFiles() throws IOException {
-    FileStatus[] snapshotFileStatuses = fs.listStatus(snapshotDir, SNAPSHOT_FILE_FILTER);
-    TimestampedFilename[] snapshotFiles = new TimestampedFilename[snapshotFileStatuses.length];
-    for (int i = 0; i < snapshotFileStatuses.length; i++) {
-      snapshotFiles[i] = new TimestampedFilename(snapshotFileStatuses[i].getPath());
-    }
-    return snapshotFiles;
-  }
-
-  @Override
-  public long deleteOldSnapshots(int numberToKeep) throws IOException {
-    TimestampedFilename[] snapshots = listSnapshotFiles();
-    if (snapshots.length == 0) {
-      return -1;
-    }
-    Arrays.sort(snapshots, Collections.reverseOrder());
-    if (snapshots.length <= numberToKeep) {
-      // nothing to remove, oldest timestamp is the last snapshot
-      return snapshots[snapshots.length - 1].getTimestamp();
-    }
-    int toRemoveCount = snapshots.length - numberToKeep;
-    TimestampedFilename[] toRemove = new TimestampedFilename[toRemoveCount];
-    System.arraycopy(snapshots, numberToKeep, toRemove, 0, toRemoveCount);
-
-    for (TimestampedFilename f : toRemove) {
-      LOG.debug("Removing old snapshot file {}", f.getPath());
-      fs.delete(f.getPath(), false);
-    }
-    long oldestTimestamp = snapshots[numberToKeep - 1].getTimestamp();
-    LOG.debug("Removed {} old snapshot files prior to {}", toRemoveCount, oldestTimestamp);
-    return oldestTimestamp;
-  }
-
-  @Override
-  public List<String> listSnapshots() throws IOException {
-    FileStatus[] files = fs.listStatus(snapshotDir, SNAPSHOT_FILE_FILTER);
-    return Lists.transform(Arrays.asList(files), new Function<FileStatus, String>() {
-      @Nullable
-      @Override
-      public String apply(@Nullable FileStatus input) {
-        return input.getPath().getName();
-      }
-    });
-  }
-
-  @Override
-  public List<TransactionLog> getLogsSince(long timestamp) throws IOException {
-    FileStatus[] statuses = fs.listStatus(snapshotDir, new LogFileFilter(timestamp, Long.MAX_VALUE));
-    TimestampedFilename[] timestampedFiles = new TimestampedFilename[statuses.length];
-    for (int i = 0; i < statuses.length; i++) {
-      timestampedFiles[i] = new TimestampedFilename(statuses[i].getPath());
-    }
-    return Lists.transform(Arrays.asList(timestampedFiles), new Function<TimestampedFilename, TransactionLog>() {
-      @Nullable
-      @Override
-      public TransactionLog apply(@Nullable TimestampedFilename input) {
-        return openLog(input.getPath(), input.getTimestamp());
-      }
-    });
-  }
-
-  @Override
-  public TransactionLog createLog(long timestamp) throws IOException {
-    Path newLog = new Path(snapshotDir, LOG_FILE_PREFIX + timestamp);
-    return openLog(newLog, timestamp);
-  }
-
-  private TransactionLog openLog(Path path, long timestamp) {
-    return new HDFSTransactionLog(fs, hConf, path, timestamp, metricsCollector);
-  }
-
-  @Override
-  public void deleteLogsOlderThan(long timestamp) throws IOException {
-    FileStatus[] statuses = fs.listStatus(snapshotDir, new LogFileFilter(0, timestamp));
-    int removedCnt = 0;
-    for (FileStatus status : statuses) {
-      LOG.debug("Removing old transaction log {}", status.getPath());
-      if (fs.delete(status.getPath(), false)) {
-        removedCnt++;
-      } else {
-        LOG.error("Failed to delete transaction log file {}", status.getPath());
-      }
-    }
-    LOG.debug("Removed {} transaction logs older than {}", removedCnt, timestamp);
-  }
-
-  @Override
-  public void setupStorage() throws IOException {
-    if (!fs.exists(snapshotDir)) {
-      LOG.info("Creating snapshot dir at {}", snapshotDir);
-      fs.mkdirs(snapshotDir);
-    } else {
-      Preconditions.checkState(fs.isDirectory(snapshotDir),
-                               "Configured snapshot directory " + snapshotDir + " is not a directory!");
-    }
-  }
-
-  @Override
-  public List<String> listLogs() throws IOException {
-    FileStatus[] files = fs.listStatus(snapshotDir, new LogFileFilter(0, Long.MAX_VALUE));
-    return Lists.transform(Arrays.asList(files), new Function<FileStatus, String>() {
-      @Nullable
-      @Override
-      public String apply(@Nullable FileStatus input) {
-        return input.getPath().getName();
-      }
-    });
-  }
-
-  @Override
-  public String getLocation() {
-    return snapshotDir.toString();
-  }
-
-  private static class LogFileFilter implements PathFilter {
-    // starting time of files to include (inclusive)
-    private final long startTime;
-    // ending time of files to include (exclusive)
-    private final long endTime;
-
-    public LogFileFilter(long startTime, long endTime) {
-      this.startTime = startTime;
-      this.endTime = endTime;
-    }
-
-    @Override
-    public boolean accept(Path path) {
-      if (path.getName().startsWith(LOG_FILE_PREFIX)) {
-        String[] parts = path.getName().split("\\.");
-        if (parts.length == 2) {
-          try {
-            long fileTime = Long.parseLong(parts[1]);
-            return fileTime >= startTime && fileTime < endTime;
-          } catch (NumberFormatException ignored) {
-            LOG.warn("Filename {} did not match the expected pattern prefix.<timestamp>", path.getName());
-          }
-        }
-      }
-      return false;
-    }
-  }
-
-  /**
-   * Represents a filename composed of a prefix and a ".timestamp" suffix.  This is useful for manipulating both
-   * snapshot and transaction log filenames.
-   */
-  private static class TimestampedFilename implements Comparable<TimestampedFilename> {
-    private Path path;
-    private String prefix;
-    private long timestamp;
-
-    public TimestampedFilename(Path path) {
-      this.path = path;
-      String[] parts = path.getName().split("\\.");
-      if (parts.length != 2) {
-        throw new IllegalArgumentException("Filename " + path.getName() +
-            " did not match the expected pattern prefix.timestamp");
-      }
-      prefix = parts[0];
-      timestamp = Long.parseLong(parts[1]);
-    }
-
-    public Path getPath() {
-      return path;
-    }
-
-    public String getPrefix() {
-      return prefix;
-    }
-
-    public long getTimestamp() {
-      return timestamp;
-    }
-
-    @Override
-    public int compareTo(TimestampedFilename other) {
-      int res = prefix.compareTo(other.getPrefix());
-      if (res == 0) {
-        res = Longs.compare(timestamp, other.getTimestamp());
-      }
-      return res;
-    }
-  }
-
-  // TODO move this out as a separate command line tool
-  private enum CLIMode { SNAPSHOT, TXLOG };
-  /**
-   * Reads a transaction state snapshot or transaction log from HDFS and prints the entries to stdout.
-   *
-   * Supports the following options:
-   *    -s    read snapshot state (defaults to the latest)
-   *    -l    read a transaction log
-   *    [filename]  reads the given file
-   * @param args
-   */
-  public static void main(String[] args) {
-    List<String> filenames = Lists.newArrayList();
-    CLIMode mode = null;
-    for (String arg : args) {
-      if ("-s".equals(arg)) {
-        mode = CLIMode.SNAPSHOT;
-      } else if ("-l".equals(arg)) {
-        mode = CLIMode.TXLOG;
-      } else if ("-h".equals(arg)) {
-        printUsage(null);
-      } else {
-        filenames.add(arg);
-      }
-    }
-
-    if (mode == null) {
-      printUsage("ERROR: Either -s or -l is required to set mode.", 1);
-    }
-
-    Configuration config = new ConfigurationFactory().get();
-
-    // Use the no-op metrics collector.  We are being run as a command line tool, so there are no relevant metrics
-    // to report
-    HDFSTransactionStateStorage storage =
-      new HDFSTransactionStateStorage(config, new SnapshotCodecProvider(config), new TxMetricsCollector());
-    storage.startAndWait();
-    try {
-      switch (mode) {
-        case SNAPSHOT:
-          try {
-            if (filenames.isEmpty()) {
-              TransactionSnapshot snapshot = storage.getLatestSnapshot();
-              printSnapshot(snapshot);
-            }
-            for (String file : filenames) {
-              Path path = new Path(file);
-              TransactionSnapshot snapshot = storage.readSnapshotFile(path);
-              printSnapshot(snapshot);
-              System.out.println();
-            }
-          } catch (IOException ioe) {
-            System.err.println("Error reading snapshot files: " + ioe.getMessage());
-            ioe.printStackTrace();
-            System.exit(1);
-          }
-          break;
-        case TXLOG:
-          if (filenames.isEmpty()) {
-            printUsage("ERROR: At least one transaction log filename is required!", 1);
-          }
-          for (String file : filenames) {
-            TimestampedFilename timestampedFilename = new TimestampedFilename(new Path(file));
-            TransactionLog log = storage.openLog(timestampedFilename.getPath(), timestampedFilename.getTimestamp());
-            printLog(log);
-            System.out.println();
-          }
-          break;
-      }
-    } finally {
-      storage.stop();
-    }
-  }
-
-  private static void printUsage(String message) {
-    printUsage(message, 0);
-  }
-
-  private static void printUsage(String message, int exitCode) {
-    if (message != null) {
-      System.out.println(message);
-    }
-    System.out.println("Usage: java " + HDFSTransactionStateStorage.class.getName() + " (-s|-l) file1 [file2...]");
-    System.out.println();
-    System.out.println("\t-s\tRead files as transaction state snapshots (will default to latest if no file given)");
-    System.out.println("\t-l\tRead files as transaction logs [filename is required]");
-    System.out.println("\t-h\tPrint this message");
-    System.exit(exitCode);
-  }
-
-  private static void printSnapshot(TransactionSnapshot snapshot) {
-    Date snapshotDate = new Date(snapshot.getTimestamp());
-    System.out.println("TransactionSnapshot at " + snapshotDate.toString());
-    System.out.println("\t" + snapshot.toString());
-  }
-
-  private static void printLog(TransactionLog log) {
-    try {
-      System.out.println("TransactionLog " + log.getName());
-      TransactionLogReader reader = log.getReader();
-      TransactionEdit edit;
-      long seq = 0;
-      while ((edit = reader.next()) != null) {
-        System.out.println(String.format("    %d: %s", seq++, edit.toString()));
-      }
-    } catch (IOException ioe) {
-      System.err.println("ERROR reading log " + log.getName() + ": " + ioe.getMessage());
-      ioe.printStackTrace();
-    }
-  }
-}