You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by on 2016/05/06 23:02:35 UTC

[29/51] [partial] incubator-tephra git commit: Rename package to org.apache.tephra
diff --git a/tephra-core/src/test/java/co/cask/tephra/distributed/ b/tephra-core/src/test/java/co/cask/tephra/distributed/
deleted file mode 100644
index 40354cf..0000000
--- a/tephra-core/src/test/java/co/cask/tephra/distributed/
+++ /dev/null
@@ -1,130 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package co.cask.tephra.distributed;
-import org.junit.Assert;
-import org.junit.Test;
-import java.util.concurrent.atomic.AtomicInteger;
- * Tests for {@link ElasticPool}.
- */
-public class ElasticPoolTest {
-  static class Dummy {
-    static AtomicInteger count = new AtomicInteger(0);
-    boolean valid = true;
-    Dummy() {
-      count.incrementAndGet();
-    }
-    void markInvalid() {
-      valid = false;
-    }
-    public boolean isValid() {
-      return valid;
-    }
-  }
-  class DummyPool extends ElasticPool<Dummy, RuntimeException> {
-    public DummyPool(int sizeLimit) {
-      super(sizeLimit);
-    }
-    @Override
-    protected Dummy create() {
-      return new Dummy();
-    }
-    @Override
-    protected boolean recycle(Dummy element) {
-      return element.isValid();
-    }
-  }
-  @Test(timeout = 5000)
-  public void testFewerThreadsThanElements() throws InterruptedException {
-    final DummyPool pool = new DummyPool(5);
-    Dummy.count.set(0);
-    createAndRunThreads(2, pool, false);
-    // we only ran 2 threads, so only 2 elements got created, even though pool size is 5
-    Assert.assertEquals(2, Dummy.count.get());
-  }
-  @Test(timeout = 5000)
-  public void testMoreThreadsThanElements() throws InterruptedException {
-    final DummyPool pool = new DummyPool(2);
-    Dummy.count.set(0);
-    createAndRunThreads(5, pool, false);
-    // even though we ran 5 threads, only 2 elements got created because pool size is 2
-    Assert.assertEquals(2, Dummy.count.get());
-  }
-  @Test(timeout = 5000)
-  public void testMoreThreadsThanElementsWithDiscard() throws InterruptedException {
-    final DummyPool pool = new DummyPool(2);
-    Dummy.count.set(0);
-    int numThreads = 3;
-    // pass 'true' as the last parameter, which results in the elements being discarded after each obtain() call.
-    createAndRunThreads(numThreads, pool, true);
-    // this results in (5 * numThreads) number of elements being created since each thread obtains a client 5 times.
-    Assert.assertEquals(5 * numThreads, Dummy.count.get());
-  }
-  // Creates a list of threads which obtain a client from the pool, sleeps for a certain amount of time, and then
-  // releases the client back to the pool, optionally marking it invalid before doing so. It repeats this five times.
-  // Then, runs these threads to completion.
-  private void createAndRunThreads(int numThreads, final DummyPool pool,
-                                   final boolean discardAtEnd) throws InterruptedException {
-    Thread[] threads = new Thread[numThreads];
-    for (int i = 0; i < numThreads; i++) {
-      threads[i] = new Thread() {
-        @Override
-        public void run() {
-          for (int j = 0; j < 5; ++j) {
-            Dummy dummy;
-            try {
-              dummy = pool.obtain();
-            } catch (InterruptedException e) {
-              throw Throwables.propagate(e);
-            }
-            try {
-              Thread.sleep(10L);
-            } catch (InterruptedException e) {
-              // ignored
-            }
-            if (discardAtEnd) {
-              dummy.markInvalid();
-            }
-            pool.release(dummy);
-          }
-        }
-      };
-    }
-    for (Thread t : threads) {
-      t.start();
-    }
-    for (Thread t : threads) {
-      t.join();
-    }
-  }
diff --git a/tephra-core/src/test/java/co/cask/tephra/distributed/ b/tephra-core/src/test/java/co/cask/tephra/distributed/
deleted file mode 100644
index 354b5b7..0000000
--- a/tephra-core/src/test/java/co/cask/tephra/distributed/
+++ /dev/null
@@ -1,187 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package co.cask.tephra.distributed;
-import co.cask.tephra.TransactionServiceMain;
-import co.cask.tephra.TxConstants;
-import co.cask.tephra.runtime.ConfigModule;
-import co.cask.tephra.runtime.DiscoveryModules;
-import co.cask.tephra.runtime.TransactionClientModule;
-import co.cask.tephra.runtime.TransactionModules;
-import co.cask.tephra.runtime.ZKModule;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.twill.discovery.DiscoveryServiceClient;
-import org.apache.twill.internal.zookeeper.InMemoryZKServer;
-import org.apache.twill.zookeeper.ZKClientService;
-import org.junit.Assert;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeoutException;
-public class PooledClientProviderTest {
-  public static final int MAX_CLIENT_COUNT = 3;
-  public static final long CLIENT_OBTAIN_TIMEOUT = 10;
-  @ClassRule
-  public static TemporaryFolder tmpFolder = new TemporaryFolder();
-  @Test
-  public void testClientConnectionPoolMaximumNumberOfClients() throws Exception {
-    // We need a server for the client to connect to
-    InMemoryZKServer zkServer = InMemoryZKServer.builder().setDataDir(tmpFolder.newFolder()).build();
-    zkServer.startAndWait();
-    try {
-      Configuration conf = new Configuration();
-      conf.set(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM, zkServer.getConnectionStr());
-      conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath());
-      conf.set("data.tx.client.count", Integer.toString(MAX_CLIENT_COUNT));
-      conf.set("data.tx.client.obtain.timeout", Long.toString(CLIENT_OBTAIN_TIMEOUT));
-      final TransactionServiceMain main = new TransactionServiceMain(conf);
-      final CountDownLatch latch = new CountDownLatch(1);
-      Thread t = new Thread() {
-        @Override
-        public void run() {
-          try {
-            main.start();
-            latch.countDown();
-          } catch (Exception e) {
-            throw Throwables.propagate(e);
-          }
-        }
-      };
-      try {
-        t.start();
-        // Wait for service to startup
-        latch.await();
-        startClientAndTestPool(conf);
-      } finally {
-        main.stop();
-        t.join();
-      }
-    } finally {
-      zkServer.stopAndWait();
-    }
-  }
-  private void startClientAndTestPool(Configuration conf) throws Exception {
-    Injector injector = Guice.createInjector(
-      new ConfigModule(conf),
-      new ZKModule(),
-      new DiscoveryModules().getDistributedModules(),
-      new TransactionModules().getDistributedModules(),
-      new TransactionClientModule()
-    );
-    ZKClientService zkClient = injector.getInstance(ZKClientService.class);
-    zkClient.startAndWait();
-    final PooledClientProvider clientProvider = new PooledClientProvider(conf,
-      injector.getInstance(DiscoveryServiceClient.class));
-    // test simple case of get + return. Note: this also initializes the provider's pool, which
-    // takes about one second (discovery). Doing it before we test the threads makes it so that one
-    // thread doesn't take exceptionally longer than the others.
-    try (CloseableThriftClient closeableThriftClient = clientProvider.getCloseableClient()) {
-      // do nothing with the client
-    }
-    //Now race to get MAX_CLIENT_COUNT+1 clients, exhausting the pool and requesting 1 more.
-    List<Future<Integer>> clientIds = new ArrayList<Future<Integer>>();
-    CountDownLatch countDownLatch = new CountDownLatch(1);
-    ExecutorService executor = Executors.newFixedThreadPool(MAX_CLIENT_COUNT + 1);
-    for (int i = 0; i < MAX_CLIENT_COUNT + 1; i++) {
-      clientIds.add(executor.submit(new RetrieveClient(clientProvider, CLIENT_OBTAIN_TIMEOUT / 2, countDownLatch)));
-    }
-    countDownLatch.countDown();
-    Set<Integer> ids = new HashSet<Integer>();
-    for (Future<Integer> id : clientIds) {
-      ids.add(id.get());
-    }
-    Assert.assertEquals(MAX_CLIENT_COUNT, ids.size());
-    // now, try it again with, where each thread holds onto the client for twice the client.obtain.timeout value.
-    // one of the threads should throw a TimeOutException, because the other threads don't release their clients
-    // within the configured timeout.
-    countDownLatch = new CountDownLatch(1);
-    for (int i = 0; i < MAX_CLIENT_COUNT + 1; i++) {
-      clientIds.add(executor.submit(new RetrieveClient(clientProvider, CLIENT_OBTAIN_TIMEOUT * 2, countDownLatch)));
-    }
-    countDownLatch.countDown();
-    int numTimeoutExceptions = 0;
-    for (Future<Integer> clientId : clientIds) {
-      try {
-        clientId.get();
-      } catch (ExecutionException expected) {
-        Assert.assertEquals(TimeoutException.class, expected.getCause().getClass());
-        numTimeoutExceptions++;
-      }
-    }
-    // expect that exactly one of the threads hit the TimeoutException
-    Assert.assertEquals(String.format("Expected one thread to not obtain a client within %s milliseconds.",
-                                      CLIENT_OBTAIN_TIMEOUT),
-                        1, numTimeoutExceptions);
-    executor.shutdown();
-  }
-  private static class RetrieveClient implements Callable<Integer> {
-    private final PooledClientProvider pool;
-    private final long holdClientMs;
-    private final CountDownLatch begin;
-    public RetrieveClient(PooledClientProvider pool, long holdClientMs,
-                          CountDownLatch begin) {
-      this.pool = pool;
-      this.holdClientMs = holdClientMs;
-      this.begin = begin;
-    }
-    @Override
-    public Integer call() throws Exception {
-      begin.await();
-      try (CloseableThriftClient client = pool.getCloseableClient()) {
-        int id = System.identityHashCode(client.getThriftClient());
-        // "use" the client for a configured amount of milliseconds
-        Thread.sleep(holdClientMs);
-        return id;
-      }
-    }
-  }
diff --git a/tephra-core/src/test/java/co/cask/tephra/distributed/ b/tephra-core/src/test/java/co/cask/tephra/distributed/
deleted file mode 100644
index 83aad3c..0000000
--- a/tephra-core/src/test/java/co/cask/tephra/distributed/
+++ /dev/null
@@ -1,238 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package co.cask.tephra.distributed;
-import co.cask.tephra.ThriftTransactionSystemTest;
-import co.cask.tephra.TransactionSystemClient;
-import co.cask.tephra.TxConstants;
-import co.cask.tephra.persist.InMemoryTransactionStateStorage;
-import co.cask.tephra.persist.TransactionEdit;
-import co.cask.tephra.persist.TransactionLog;
-import co.cask.tephra.persist.TransactionStateStorage;
-import co.cask.tephra.runtime.ConfigModule;
-import co.cask.tephra.runtime.DiscoveryModules;
-import co.cask.tephra.runtime.TransactionClientModule;
-import co.cask.tephra.runtime.TransactionModules;
-import co.cask.tephra.runtime.ZKModule;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.twill.internal.zookeeper.InMemoryZKServer;
-import org.apache.twill.zookeeper.ZKClientService;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
- * This tests whether transaction service hangs on stop when heavily loaded -
- */
-public class ThriftTransactionServerTest {
-  private static final Logger LOG = LoggerFactory.getLogger(ThriftTransactionSystemTest.class);
-  private static InMemoryZKServer zkServer;
-  private static ZKClientService zkClientService;
-  private static TransactionService txService;
-  private static TransactionStateStorage storage;
-  static Injector injector;
-  private static final int NUM_CLIENTS = 17;
-  private static final CountDownLatch STORAGE_WAIT_LATCH = new CountDownLatch(1);
-  private static final CountDownLatch CLIENTS_DONE_LATCH = new CountDownLatch(NUM_CLIENTS);
-  @ClassRule
-  public static TemporaryFolder tmpFolder = new TemporaryFolder();
-  @BeforeClass
-  public static void start() throws Exception {
-    zkServer = InMemoryZKServer.builder().setDataDir(tmpFolder.newFolder()).build();
-    zkServer.startAndWait();
-    Configuration conf = new Configuration();
-    conf.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false);
-    conf.set(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM, zkServer.getConnectionStr());
-    conf.set(TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, "n-times");
-    conf.setInt(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, 1);
-    conf.setInt(TxConstants.Service.CFG_DATA_TX_CLIENT_COUNT, NUM_CLIENTS);
-    conf.setLong(TxConstants.Service.CFG_DATA_TX_CLIENT_TIMEOUT, TimeUnit.HOURS.toMillis(1));
-    conf.setInt(TxConstants.Service.CFG_DATA_TX_SERVER_IO_THREADS, 2);
-    conf.setInt(TxConstants.Service.CFG_DATA_TX_SERVER_THREADS, 4);
-    injector = Guice.createInjector(
-      new ConfigModule(conf),
-      new ZKModule(),
-      new DiscoveryModules().getDistributedModules(),
-      Modules.override(new TransactionModules().getDistributedModules())
-        .with(new AbstractModule() {
-          @Override
-          protected void configure() {
-            bind(TransactionStateStorage.class).to(SlowTransactionStorage.class).in(Scopes.SINGLETON);
-          }
-        }),
-      new TransactionClientModule()
-    );
-    zkClientService = injector.getInstance(ZKClientService.class);
-    zkClientService.startAndWait();
-    // start a tx server
-    txService = injector.getInstance(TransactionService.class);
-    storage = injector.getInstance(TransactionStateStorage.class);
-    try {
-"Starting transaction service");
-      txService.startAndWait();
-    } catch (Exception e) {
-      LOG.error("Failed to start service: ", e);
-    }
-  }
-  @Before
-  public void reset() throws Exception {
-    getClient().resetState();
-  }
-  @AfterClass
-  public static void stop() throws Exception {
-    txService.stopAndWait();
-    storage.stopAndWait();
-    zkClientService.stopAndWait();
-    zkServer.stopAndWait();
-  }
-  public TransactionSystemClient getClient() throws Exception {
-    return injector.getInstance(TransactionSystemClient.class);
-  }
-  @Test
-  public void testThriftServerStop() throws Exception {
-    int nThreads = NUM_CLIENTS;
-    ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
-    for (int i = 0; i < nThreads; ++i) {
-      executorService.submit(new Runnable() {
-        @Override
-        public void run() {
-          try {
-            TransactionSystemClient txClient = getClient();
-            CLIENTS_DONE_LATCH.countDown();
-            txClient.startShort();
-          } catch (Exception e) {
-            // Exception expected
-          }
-        }
-      });
-    }
-    // Wait till all clients finish sending reqeust to transaction manager
-    CLIENTS_DONE_LATCH.await();
-    TimeUnit.SECONDS.sleep(1);
-    // Expire zookeeper session, which causes Thrift server to stop.
-    expireZkSession(zkClientService);
-    waitForThriftTermination();
-    // Stop Zookeeper client so that it does not re-connect to Zookeeper and start Thrift sever again.
-    zkClientService.stopAndWait();
-    STORAGE_WAIT_LATCH.countDown();
-    TimeUnit.SECONDS.sleep(1);
-    // Make sure Thrift server stopped.
-    Assert.assertEquals(Service.State.TERMINATED, txService.thriftRPCServerState());
-  }
-  private void expireZkSession(ZKClientService zkClientService) throws Exception {
-    ZooKeeper zooKeeper = zkClientService.getZooKeeperSupplier().get();
-    final SettableFuture<?> connectFuture = SettableFuture.create();
-    Watcher watcher = new Watcher() {
-      @Override
-      public void process(WatchedEvent event) {
-        if (event.getState() == Event.KeeperState.SyncConnected) {
-          connectFuture.set(null);
-        }
-      }
-    };
-    // Create another Zookeeper session with the same sessionId so that the original one expires.
-    final ZooKeeper dupZookeeper =
-      new ZooKeeper(zkClientService.getConnectString(), zooKeeper.getSessionTimeout(), watcher,
-                    zooKeeper.getSessionId(), zooKeeper.getSessionPasswd());
-    connectFuture.get(30, TimeUnit.SECONDS);
-    Assert.assertEquals("Failed to re-create current session", dupZookeeper.getState(), ZooKeeper.States.CONNECTED);
-    dupZookeeper.close();
-  }
-  private void waitForThriftTermination() throws InterruptedException {
-    int count = 0;
-    while (txService.thriftRPCServerState() != Service.State.TERMINATED && count++ < 200) {
-      TimeUnit.MILLISECONDS.sleep(50);
-    }
-  }
-  private static class SlowTransactionStorage extends InMemoryTransactionStateStorage {
-    @Override
-    public TransactionLog createLog(long timestamp) throws IOException {
-      return new SlowTransactionLog(timestamp);
-    }
-  }
-  private static class SlowTransactionLog extends InMemoryTransactionStateStorage.InMemoryTransactionLog {
-    public SlowTransactionLog(long timestamp) {
-      super(timestamp);
-    }
-    @Override
-    public void append(TransactionEdit edit) throws IOException {
-      try {
-        STORAGE_WAIT_LATCH.await();
-      } catch (InterruptedException e) {
-        LOG.error("Got exception: ", e);
-      }
-      super.append(edit);
-    }
-    @Override
-    public void append(List<TransactionEdit> edits) throws IOException {
-      try {
-        STORAGE_WAIT_LATCH.await();
-      } catch (InterruptedException e) {
-        LOG.error("Got exception: ", e);
-      }
-      super.append(edits);
-    }
-  }
diff --git a/tephra-core/src/test/java/co/cask/tephra/hbase/ b/tephra-core/src/test/java/co/cask/tephra/hbase/
deleted file mode 100644
index 6427b07..0000000
--- a/tephra-core/src/test/java/co/cask/tephra/hbase/
+++ /dev/null
@@ -1,56 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package co.cask.tephra.hbase;
-import co.cask.tephra.TransactionManager;
-import co.cask.tephra.TxConstants;
-import co.cask.tephra.util.ConfigurationFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.junit.After;
-import org.junit.Before;
-import java.util.List;
- * Common test class for TransactionVisibilityFilter implementations.
- */
-public abstract class AbstractTransactionVisibilityFilterTest {
-  protected static final byte[] FAM = new byte[] {'f'};
-  protected static final byte[] FAM2 = new byte[] {'f', '2'};
-  protected static final byte[] FAM3 = new byte[] {'f', '3'};
-  protected static final byte[] COL = new byte[] {'c'};
-  protected static final List<byte[]> EMPTY_CHANGESET = Lists.newArrayListWithCapacity(0);
-  protected TransactionManager txManager;
-  @Before
-  public void setup() throws Exception {
-    Configuration conf = new ConfigurationFactory().get();
-    conf.unset(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES);
-    txManager = new TransactionManager(conf);
-    txManager.startAndWait();
-  }
-  @After
-  public void tearDown() throws Exception {
-    txManager.stopAndWait();
-  }
diff --git a/tephra-core/src/test/java/co/cask/tephra/persist/ b/tephra-core/src/test/java/co/cask/tephra/persist/
deleted file mode 100644
index b2bf69c..0000000
--- a/tephra-core/src/test/java/co/cask/tephra/persist/
+++ /dev/null
@@ -1,555 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package co.cask.tephra.persist;
-import co.cask.tephra.ChangeId;
-import co.cask.tephra.Transaction;
-import co.cask.tephra.TransactionManager;
-import co.cask.tephra.TransactionType;
-import co.cask.tephra.TxConstants;
-import co.cask.tephra.metrics.TxMetricsCollector;
-import co.cask.tephra.util.TransactionEditUtil;
-import it.unimi.dsi.fastutil.longs.LongArrayList;
-import org.apache.hadoop.conf.Configuration;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
- * Commons tests to run against the {@link TransactionStateStorage} implementations.
- */
-public abstract class AbstractTransactionStateStorageTest {
-  private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionStateStorageTest.class);
-  private static Random random = new Random();
-  protected abstract Configuration getConfiguration(String testName) throws IOException;
-  protected abstract AbstractTransactionStateStorage getStorage(Configuration conf);
-  @Test
-  public void testSnapshotPersistence() throws Exception {
-    Configuration conf = getConfiguration("testSnapshotPersistence");
-    TransactionSnapshot snapshot = createRandomSnapshot();
-    TransactionStateStorage storage = getStorage(conf);
-    try {
-      storage.startAndWait();
-      storage.writeSnapshot(snapshot);
-      TransactionSnapshot readSnapshot = storage.getLatestSnapshot();
-      assertNotNull(readSnapshot);
-      assertEquals(snapshot, readSnapshot);
-    } finally {
-      storage.stopAndWait();
-    }
-  }
-  @Test
-  public void testLogWriteAndRead() throws Exception {
-    Configuration conf = getConfiguration("testLogWriteAndRead");
-    // create some random entries
-    List<TransactionEdit> edits = TransactionEditUtil.createRandomEdits(100);
-    TransactionStateStorage storage = getStorage(conf);
-    try {
-      long now = System.currentTimeMillis();
-      storage.startAndWait();
-      TransactionLog log = storage.createLog(now);
-      for (TransactionEdit edit : edits) {
-        log.append(edit);
-      }
-      log.close();
-      Collection<TransactionLog> logsToRead = storage.getLogsSince(now);
-      // should only be our one log
-      assertNotNull(logsToRead);
-      assertEquals(1, logsToRead.size());
-      TransactionLogReader logReader = logsToRead.iterator().next().getReader();
-      assertNotNull(logReader);
-      List<TransactionEdit> readEdits = Lists.newArrayListWithExpectedSize(edits.size());
-      TransactionEdit nextEdit;
-      while ((nextEdit = != null) {
-        readEdits.add(nextEdit);
-      }
-      logReader.close();
-      assertEquals(edits.size(), readEdits.size());
-      for (int i = 0; i < edits.size(); i++) {
-"Checking edit " + i);
-        assertEquals(edits.get(i), readEdits.get(i));
-      }
-    } finally {
-      storage.stopAndWait();
-    }
-  }
-  @Test
-  public void testTransactionManagerPersistence() throws Exception {
-    Configuration conf = getConfiguration("testTransactionManagerPersistence");
-    conf.setInt(TxConstants.Manager.CFG_TX_CLEANUP_INTERVAL, 0); // no cleanup thread
-    // start snapshot thread, but with long enough interval so we only get snapshots on shutdown
-    conf.setInt(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, 600);
-    TransactionStateStorage storage = null;
-    TransactionStateStorage storage2 = null;
-    TransactionStateStorage storage3 = null;
-    try {
-      storage = getStorage(conf);
-      TransactionManager txManager = new TransactionManager
-        (conf, storage, new TxMetricsCollector());
-      txManager.startAndWait();
-      // TODO: replace with new persistence tests
-      final byte[] a = { 'a' };
-      final byte[] b = { 'b' };
-      // start a tx1, add a change A and commit
-      Transaction tx1 = txManager.startShort();
-      Assert.assertTrue(txManager.canCommit(tx1, Collections.singleton(a)));
-      Assert.assertTrue(txManager.commit(tx1));
-      // start a tx2 and add a change B
-      Transaction tx2 = txManager.startShort();
-      Assert.assertTrue(txManager.canCommit(tx2, Collections.singleton(b)));
-      // start a tx3
-      Transaction tx3 = txManager.startShort();
-      // restart
-      txManager.stopAndWait();
-      TransactionSnapshot origState = txManager.getCurrentState();
-"Orig state: " + origState);
-      Thread.sleep(100);
-      // starts a new tx manager
-      storage2 = getStorage(conf);
-      txManager = new TransactionManager(conf, storage2, new TxMetricsCollector());
-      txManager.startAndWait();
-      // check that the reloaded state matches the old
-      TransactionSnapshot newState = txManager.getCurrentState();
-"New state: " + newState);
-      assertEquals(origState, newState);
-      // commit tx2
-      Assert.assertTrue(txManager.commit(tx2));
-      // start another transaction, must be greater than tx3
-      Transaction tx4 = txManager.startShort();
-      Assert.assertTrue(tx4.getTransactionId() > tx3.getTransactionId());
-      // tx1 must be visble from tx2, but tx3 and tx4 must not
-      Assert.assertTrue(tx2.isVisible(tx1.getTransactionId()));
-      Assert.assertFalse(tx2.isVisible(tx3.getTransactionId()));
-      Assert.assertFalse(tx2.isVisible(tx4.getTransactionId()));
-      // add same change for tx3
-      Assert.assertFalse(txManager.canCommit(tx3, Collections.singleton(b)));
-      // check visibility with new xaction
-      Transaction tx5 = txManager.startShort();
-      Assert.assertTrue(tx5.isVisible(tx1.getTransactionId()));
-      Assert.assertTrue(tx5.isVisible(tx2.getTransactionId()));
-      Assert.assertFalse(tx5.isVisible(tx3.getTransactionId()));
-      Assert.assertFalse(tx5.isVisible(tx4.getTransactionId()));
-      // can commit tx3?
-      txManager.abort(tx3);
-      txManager.abort(tx4);
-      txManager.abort(tx5);
-      // start new tx and verify its exclude list is empty
-      Transaction tx6 = txManager.startShort();
-      Assert.assertFalse(tx6.hasExcludes());
-      txManager.abort(tx6);
-      // now start 5 x claim size transactions
-      Transaction tx = txManager.startShort();
-      for (int i = 1; i < 50; i++) {
-        tx = txManager.startShort();
-      }
-      origState = txManager.getCurrentState();
-      Thread.sleep(100);
-      // simulate crash by starting a new tx manager without a stopAndWait
-      storage3 = getStorage(conf);
-      txManager = new TransactionManager(conf, storage3, new TxMetricsCollector());
-      txManager.startAndWait();
-      // verify state again matches (this time should include WAL replay)
-      newState = txManager.getCurrentState();
-      assertEquals(origState, newState);
-      // get a new transaction and verify it is greater
-      Transaction txAfter = txManager.startShort();
-      Assert.assertTrue(txAfter.getTransactionId() > tx.getTransactionId());
-    } finally {
-      if (storage != null) {
-        storage.stopAndWait();
-      }
-      if (storage2 != null) {
-        storage2.stopAndWait();
-      }
-      if (storage3 != null) {
-        storage3.stopAndWait();
-      }
-    }
-  }
-  /**
-   * Tests whether the committed set is advanced properly on WAL replay.
-   */
-  @Test
-  public void testCommittedSetClearing() throws Exception {
-    Configuration conf = getConfiguration("testCommittedSetClearing");
-    conf.setInt(TxConstants.Manager.CFG_TX_CLEANUP_INTERVAL, 0); // no cleanup thread
-    conf.setInt(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, 0); // no periodic snapshots
-    TransactionStateStorage storage1 = null;
-    TransactionStateStorage storage2 = null;
-    try {
-      storage1 = getStorage(conf);
-      TransactionManager txManager = new TransactionManager
-        (conf, storage1, new TxMetricsCollector());
-      txManager.startAndWait();
-      // TODO: replace with new persistence tests
-      final byte[] a = { 'a' };
-      final byte[] b = { 'b' };
-      // start a tx1, add a change A and commit
-      Transaction tx1 = txManager.startShort();
-      Assert.assertTrue(txManager.canCommit(tx1, Collections.singleton(a)));
-      Assert.assertTrue(txManager.commit(tx1));
-      // start a tx2 and add a change B
-      Transaction tx2 = txManager.startShort();
-      Assert.assertTrue(txManager.canCommit(tx2, Collections.singleton(b)));
-      // start a tx3
-      Transaction tx3 = txManager.startShort();
-      TransactionSnapshot origState = txManager.getCurrentState();
-"Orig state: " + origState);
-      // simulate a failure by starting a new tx manager without stopping first
-      storage2 = getStorage(conf);
-      txManager = new TransactionManager(conf, storage2, new TxMetricsCollector());
-      txManager.startAndWait();
-      // check that the reloaded state matches the old
-      TransactionSnapshot newState = txManager.getCurrentState();
-"New state: " + newState);
-      assertEquals(origState, newState);
-    } finally {
-      if (storage1 != null) {
-        storage1.stopAndWait();
-      }
-      if (storage2 != null) {
-        storage2.stopAndWait();
-      }
-    }
-  }
-  /**
-   * Tests removal of old snapshots and old transaction logs.
-   */
-  @Test
-  public void testOldFileRemoval() throws Exception {
-    Configuration conf = getConfiguration("testOldFileRemoval");
-    TransactionStateStorage storage = null;
-    try {
-      storage = getStorage(conf);
-      storage.startAndWait();
-      long now = System.currentTimeMillis();
-      long writePointer = 1;
-      Collection<Long> invalid = Lists.newArrayList();
-      NavigableMap<Long, TransactionManager.InProgressTx> inprogress = Maps.newTreeMap();
-      Map<Long, Set<ChangeId>> committing = Maps.newHashMap();
-      Map<Long, Set<ChangeId>> committed = Maps.newHashMap();
-      TransactionSnapshot snapshot = new TransactionSnapshot(now, 0, writePointer++, invalid,
-                                                             inprogress, committing, committed);
-      TransactionEdit dummyEdit = TransactionEdit.createStarted(1, 0, Long.MAX_VALUE, TransactionType.SHORT);
-      // write snapshot 1
-      storage.writeSnapshot(snapshot);
-      TransactionLog log = storage.createLog(now);
-      log.append(dummyEdit);
-      log.close();
-      snapshot = new TransactionSnapshot(now + 1, 0, writePointer++, invalid, inprogress, committing, committed);
-      // write snapshot 2
-      storage.writeSnapshot(snapshot);
-      log = storage.createLog(now + 1);
-      log.append(dummyEdit);
-      log.close();
-      snapshot = new TransactionSnapshot(now + 2, 0, writePointer++, invalid, inprogress, committing, committed);
-      // write snapshot 3
-      storage.writeSnapshot(snapshot);
-      log = storage.createLog(now + 2);
-      log.append(dummyEdit);
-      log.close();
-      snapshot = new TransactionSnapshot(now + 3, 0, writePointer++, invalid, inprogress, committing, committed);
-      // write snapshot 4
-      storage.writeSnapshot(snapshot);
-      log = storage.createLog(now + 3);
-      log.append(dummyEdit);
-      log.close();
-      snapshot = new TransactionSnapshot(now + 4, 0, writePointer++, invalid, inprogress, committing, committed);
-      // write snapshot 5
-      storage.writeSnapshot(snapshot);
-      log = storage.createLog(now + 4);
-      log.append(dummyEdit);
-      log.close();
-      snapshot = new TransactionSnapshot(now + 5, 0, writePointer++, invalid, inprogress, committing, committed);
-      // write snapshot 6
-      storage.writeSnapshot(snapshot);
-      log = storage.createLog(now + 5);
-      log.append(dummyEdit);
-      log.close();
-      List<String> allSnapshots = storage.listSnapshots();
-"All snapshots: " + allSnapshots);
-      assertEquals(6, allSnapshots.size());
-      List<String> allLogs = storage.listLogs();
-"All logs: " + allLogs);
-      assertEquals(6, allLogs.size());
-      long oldestKept = storage.deleteOldSnapshots(3);
-      assertEquals(now + 3, oldestKept);
-      allSnapshots = storage.listSnapshots();
-"All snapshots: " + allSnapshots);
-      assertEquals(3, allSnapshots.size());
-      storage.deleteLogsOlderThan(oldestKept);
-      allLogs = storage.listLogs();
-"All logs: " + allLogs);
-      assertEquals(3, allLogs.size());
-    } finally {
-      if (storage != null) {
-        storage.stopAndWait();
-      }
-    }
-  }
-  @Test
-  public void testLongTxnEditReplay() throws Exception {
-    Configuration conf = getConfiguration("testLongTxnEditReplay");
-    TransactionStateStorage storage = null;
-    try {
-      storage = getStorage(conf);
-      storage.startAndWait();
-      // Create long running txns. Abort one of them, invalidate another, invalidate and abort the last.
-      long time1 = System.currentTimeMillis();
-      long wp1 = time1 * TxConstants.MAX_TX_PER_MS;
-      TransactionEdit edit1 = TransactionEdit.createStarted(wp1, wp1 - 10, time1 + 100000, TransactionType.LONG);
-      TransactionEdit edit2 = TransactionEdit.createAborted(wp1, TransactionType.LONG, null);
-      long time2 = time1 + 100;
-      long wp2 = time2 * TxConstants.MAX_TX_PER_MS;
-      TransactionEdit edit3 = TransactionEdit.createStarted(wp2, wp2 - 10, time2 + 100000, TransactionType.LONG);
-      TransactionEdit edit4 = TransactionEdit.createInvalid(wp2);
-      long time3 = time1 + 200;
-      long wp3 = time3 * TxConstants.MAX_TX_PER_MS;
-      TransactionEdit edit5 = TransactionEdit.createStarted(wp3, wp3 - 10, time3 + 100000, TransactionType.LONG);
-      TransactionEdit edit6 = TransactionEdit.createInvalid(wp3);
-      TransactionEdit edit7 = TransactionEdit.createAborted(wp3, TransactionType.LONG, null);
-      // write transaction edits
-      TransactionLog log = storage.createLog(time1);
-      log.append(edit1);
-      log.append(edit2);
-      log.append(edit3);
-      log.append(edit4);
-      log.append(edit5);
-      log.append(edit6);
-      log.append(edit7);
-      log.close();
-      // Start transaction manager
-      TransactionManager txm = new TransactionManager(conf, storage, new TxMetricsCollector());
-      txm.startAndWait();
-      try {
-        // Verify that all txns are in invalid list.
-        TransactionSnapshot snapshot1 = txm.getCurrentState();
-        Assert.assertEquals(ImmutableList.of(wp1, wp2, wp3), snapshot1.getInvalid());
-        Assert.assertEquals(0, snapshot1.getInProgress().size());
-        Assert.assertEquals(0, snapshot1.getCommittedChangeSets().size());
-        Assert.assertEquals(0, snapshot1.getCommittedChangeSets().size());
-      } finally {
-        txm.stopAndWait();
-      }
-    } finally {
-      if (storage != null) {
-        storage.stopAndWait();
-      }
-    }
-  }
-  @Test
-  public void testTruncateInvalidTxEditReplay() throws Exception {
-    Configuration conf = getConfiguration("testTruncateInvalidTxEditReplay");
-    TransactionStateStorage storage = null;
-    try {
-      storage = getStorage(conf);
-      storage.startAndWait();
-      // Create some txns, and invalidate all of them.
-      long time1 = System.currentTimeMillis();
-      long wp1 = time1 * TxConstants.MAX_TX_PER_MS;
-      TransactionEdit edit1 = TransactionEdit.createStarted(wp1, wp1 - 10, time1 + 100000, TransactionType.LONG);
-      TransactionEdit edit2 = TransactionEdit.createInvalid(wp1);
-      long time2 = time1 + 100;
-      long wp2 = time2 * TxConstants.MAX_TX_PER_MS;
-      TransactionEdit edit3 = TransactionEdit.createStarted(wp2, wp2 - 10, time2 + 10000, TransactionType.SHORT);
-      TransactionEdit edit4 = TransactionEdit.createInvalid(wp2);
-      long time3 = time1 + 2000;
-      long wp3 = time3 * TxConstants.MAX_TX_PER_MS;
-      TransactionEdit edit5 = TransactionEdit.createStarted(wp3, wp3 - 10, time3 + 100000, TransactionType.LONG);
-      TransactionEdit edit6 = TransactionEdit.createInvalid(wp3);
-      long time4 = time1 + 2100;
-      long wp4 = time4 * TxConstants.MAX_TX_PER_MS;
-      TransactionEdit edit7 = TransactionEdit.createStarted(wp4, wp4 - 10, time4 + 10000, TransactionType.SHORT);
-      TransactionEdit edit8 = TransactionEdit.createInvalid(wp4);
-      // remove wp1 and wp3 from invalid list
-      TransactionEdit edit9 = TransactionEdit.createTruncateInvalidTx(ImmutableSet.of(wp1, wp3));
-      // truncate invalid transactions before time3
-      TransactionEdit edit10 = TransactionEdit.createTruncateInvalidTxBefore(time3);
-      // write transaction edits
-      TransactionLog log = storage.createLog(time1);
-      log.append(edit1);
-      log.append(edit2);
-      log.append(edit3);
-      log.append(edit4);
-      log.append(edit5);
-      log.append(edit6);
-      log.append(edit7);
-      log.append(edit8);
-      log.append(edit9);
-      log.append(edit10);
-      log.close();
-      // Start transaction manager
-      TransactionManager txm = new TransactionManager(conf, storage, new TxMetricsCollector());
-      txm.startAndWait();
-      try {
-        // Only wp4 should be in invalid list.
-        TransactionSnapshot snapshot = txm.getCurrentState();
-        Assert.assertEquals(ImmutableList.of(wp4), snapshot.getInvalid());
-        Assert.assertEquals(0, snapshot.getInProgress().size());
-        Assert.assertEquals(0, snapshot.getCommittedChangeSets().size());
-        Assert.assertEquals(0, snapshot.getCommittedChangeSets().size());
-      } finally {
-        txm.stopAndWait();
-      }
-    } finally {
-      if (storage != null) {
-        storage.stopAndWait();
-      }
-    }
-  }
-  /**
-   * Generates a new snapshot object with semi-randomly populated values.  This does not necessarily accurately
-   * represent a typical snapshot's distribution of values, as we only set an upper bound on pointer values.
-   *
-   * We generate a new snapshot with the contents:
-   * <ul>
-   *   <li>readPointer = 1M + (random % 1M)</li>
-   *   <li>writePointer = readPointer + 1000</li>
-   *   <li>waterMark = writePointer + 1000</li>
-   *   <li>inProgress = one each for (writePointer - 500)..writePointer, ~ 5% "long" transaction</li>
-   *   <li>invalid = 100 randomly distributed, 0..1M</li>
-   *   <li>committing = one each, (readPointer + 1)..(readPointer + 100)</li>
-   *   <li>committed = one each, (readPointer - 1000)..readPointer</li>
-   * </ul>
-   * @return a new snapshot of transaction state.
-   */
-  private TransactionSnapshot createRandomSnapshot() {
-    // limit readPointer to a reasonable range, but make it > 1M so we can assign enough keys below
-    long readPointer = (Math.abs(random.nextLong()) % 1000000L) + 1000000L;
-    long writePointer = readPointer + 1000L;
-    // generate in progress -- assume last 500 write pointer values
-    NavigableMap<Long, TransactionManager.InProgressTx> inProgress = Maps.newTreeMap();
-    long startPointer = writePointer - 500L;
-    for (int i = 0; i < 500; i++) {
-      long currentTime = System.currentTimeMillis();
-      // make some "long" transactions
-      if (i % 20 == 0) {
-        inProgress.put(startPointer + i,
-                       new TransactionManager.InProgressTx(startPointer - 1, currentTime + TimeUnit.DAYS.toSeconds(1),
-                                                           TransactionType.LONG));
-      } else {
-        inProgress.put(startPointer + i,
-                       new TransactionManager.InProgressTx(startPointer - 1, currentTime + 300000L, 
-                                                           TransactionType.SHORT));
-      }
-    }
-    // make 100 random invalid IDs
-    LongArrayList invalid = new LongArrayList();
-    for (int i = 0; i < 100; i++) {
-      invalid.add(Math.abs(random.nextLong()) % 1000000L);
-    }
-    // make 100 committing entries, 10 keys each
-    Map<Long, Set<ChangeId>> committing = Maps.newHashMap();
-    for (int i = 0; i < 100; i++) {
-      committing.put(readPointer + i, generateChangeSet(10));
-    }
-    // make 1000 committed entries, 10 keys each
-    long startCommitted = readPointer - 1000L;
-    NavigableMap<Long, Set<ChangeId>> committed = Maps.newTreeMap();
-    for (int i = 0; i < 1000; i++) {
-      committed.put(startCommitted + i, generateChangeSet(10));
-    }
-    return new TransactionSnapshot(System.currentTimeMillis(), readPointer, writePointer,
-                                   invalid, inProgress, committing, committed);
-  }
-  private Set<ChangeId> generateChangeSet(int numEntries) {
-    Set<ChangeId> changes = Sets.newHashSet();
-    for (int i = 0; i < numEntries; i++) {
-      byte[] bytes = new byte[8];
-      random.nextBytes(bytes);
-      changes.add(new ChangeId(bytes));
-    }
-    return changes;
-  }
diff --git a/tephra-core/src/test/java/co/cask/tephra/persist/ b/tephra-core/src/test/java/co/cask/tephra/persist/
deleted file mode 100644
index da876fa..0000000
--- a/tephra-core/src/test/java/co/cask/tephra/persist/
+++ /dev/null
@@ -1,168 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package co.cask.tephra.persist;
-import co.cask.tephra.TxConstants;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
- * Unit Test for {@link CommitMarkerCodec}.
- */
-public class CommitMarkerCodecTest {
-  @ClassRule
-  public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
-  private static final String LOG_FILE = "txlog";
-  private static final Random RANDOM = new Random();
-  private static MiniDFSCluster dfsCluster;
-  private static Configuration conf;
-  private static FileSystem fs;
-  @BeforeClass
-  public static void setupBeforeClass() throws Exception {
-    Configuration hConf = new Configuration();
-    hConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TMP_FOLDER.newFolder().getAbsolutePath());
-    dfsCluster = new MiniDFSCluster.Builder(hConf).numDataNodes(1).build();
-    conf = new Configuration(dfsCluster.getFileSystem().getConf());
-    fs = FileSystem.newInstance(FileSystem.getDefaultUri(conf), conf);
-  }
-  @AfterClass
-  public static void tearDownAfterClass() throws Exception {
-    dfsCluster.shutdown();
-  }
-  @Test
-  public void testRandomCommitMarkers() throws Exception {
-    List<Integer> randomInts = new ArrayList<>();
-    Path newLog = new Path(TMP_FOLDER.newFolder().getAbsolutePath(), LOG_FILE);
-    // Write a bunch of random commit markers
-    try (SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, newLog, LongWritable.class,
-                                                                LongWritable.class,
-                                                                SequenceFile.CompressionType.NONE)) {
-      for (int i = 0; i < 1000; i++) {
-        int randomNum = RANDOM.nextInt(Integer.MAX_VALUE);
-        CommitMarkerCodec.writeMarker(writer, randomNum);
-        randomInts.add(randomNum);
-      }
-      writer.hflush();
-      writer.hsync();
-    }
-    // Read the commit markers back to verify the marker
-    try (SequenceFile.Reader reader = new SequenceFile.Reader(fs, newLog, conf);
-         CommitMarkerCodec markerCodec = new CommitMarkerCodec()) {
-      for (int num : randomInts) {
-        Assert.assertEquals(num, markerCodec.readMarker(reader));
-      }
-    }
-  }
-  private static class IncompleteValueBytes implements SequenceFile.ValueBytes {
-    @Override
-    public void writeUncompressedBytes(DataOutputStream outStream) throws IOException {
-      // don't write anything to simulate incomplete record
-    }
-    @Override
-    public void writeCompressedBytes(DataOutputStream outStream) throws IllegalArgumentException, IOException {
-      throw new IllegalArgumentException("Not possible");
-    }
-    @Override
-    public int getSize() {
-      return Ints.BYTES;
-    }
-  }
-  @Test
-  public void testIncompleteCommitMarker() throws Exception {
-    Path newLog = new Path(TMP_FOLDER.newFolder().getAbsolutePath(), LOG_FILE);
-    try (SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, newLog, LongWritable.class,
-                                                                LongWritable.class,
-                                                                SequenceFile.CompressionType.NONE)) {
-      String key = TxConstants.TransactionLog.NUM_ENTRIES_APPENDED;
-      SequenceFile.ValueBytes valueBytes = new IncompleteValueBytes();
-      writer.appendRaw(key.getBytes(), 0, key.length(), valueBytes);
-      writer.hflush();
-      writer.hsync();
-    }
-    // Read the incomplete commit marker
-    try (SequenceFile.Reader reader = new SequenceFile.Reader(fs, newLog, conf);
-         CommitMarkerCodec markerCodec = new CommitMarkerCodec()) {
-      try {
-        markerCodec.readMarker(reader);
-"Expected EOF Exception to be thrown");
-      } catch (EOFException e) {
-        // expected since we didn't write the value bytes
-      }
-    }
-  }
-  @Test
-  public void testIncorrectCommitMarker() throws Exception {
-    Path newLog = new Path(TMP_FOLDER.newFolder().getAbsolutePath(), LOG_FILE);
-    // Write an incorrect marker
-    try (SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, newLog, LongWritable.class,
-                                                                LongWritable.class,
-                                                                SequenceFile.CompressionType.NONE)) {
-      String invalidKey = "IncorrectKey";
-      SequenceFile.ValueBytes valueBytes = new CommitMarkerCodec.CommitEntriesCount(100);
-      writer.appendRaw(invalidKey.getBytes(), 0, invalidKey.length(), valueBytes);
-      writer.hflush();
-      writer.hsync();
-    }
-    // Read the commit markers back to verify the marker
-    try (SequenceFile.Reader reader = new SequenceFile.Reader(fs, newLog, conf);
-         CommitMarkerCodec markerCodec = new CommitMarkerCodec()) {
-      try {
-        markerCodec.readMarker(reader);
-"Expected an IOException to be thrown");
-      } catch (IOException e) {
-        // expected
-      }
-    }
-  }
diff --git a/tephra-core/src/test/java/co/cask/tephra/persist/ b/tephra-core/src/test/java/co/cask/tephra/persist/
deleted file mode 100644
index 96015d1..0000000
--- a/tephra-core/src/test/java/co/cask/tephra/persist/
+++ /dev/null
@@ -1,198 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package co.cask.tephra.persist;
-import co.cask.tephra.TxConstants;
-import co.cask.tephra.metrics.MetricsCollector;
-import co.cask.tephra.metrics.TxMetricsCollector;
-import co.cask.tephra.util.TransactionEditUtil;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
- * Testing for complete and partial sycs of {@link TransactionEdit} to {@link HDFSTransactionLog}
- */
-public class HDFSTransactionLogTest {
-  @ClassRule
-  public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
-  private static final String LOG_FILE_PREFIX = "txlog.";
-  private static MiniDFSCluster dfsCluster;
-  private static Configuration conf;
-  private static MetricsCollector metricsCollector;
-  @BeforeClass
-  public static void setupBeforeClass() throws Exception {
-    Configuration hConf = new Configuration();
-    hConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TMP_FOLDER.newFolder().getAbsolutePath());
-    dfsCluster = new MiniDFSCluster.Builder(hConf).numDataNodes(1).build();
-    conf = new Configuration(dfsCluster.getFileSystem().getConf());
-    metricsCollector = new TxMetricsCollector();
-  }
-  @AfterClass
-  public static void tearDownAfterClass() throws Exception {
-    dfsCluster.shutdown();
-  }
-  private Configuration getConfiguration() throws IOException {
-    // tests should use the current user for HDFS
-    conf.unset(TxConstants.Manager.CFG_TX_HDFS_USER);
-    conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, TMP_FOLDER.newFolder().getAbsolutePath());
-    return conf;
-  }
-  private HDFSTransactionLog getHDFSTransactionLog(Configuration conf,
-                                                   FileSystem fs, long timeInMillis) throws Exception {
-    String snapshotDir = conf.get(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR);
-    Path newLog = new Path(snapshotDir, LOG_FILE_PREFIX + timeInMillis);
-    return new HDFSTransactionLog(fs, conf, newLog, timeInMillis, metricsCollector);
-  }
-  private SequenceFile.Writer getSequenceFileWriter(Configuration configuration, FileSystem fs,
-                                                    long timeInMillis, boolean withMarker) throws IOException {
-    String snapshotDir = configuration.get(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR);
-    Path newLog = new Path(snapshotDir, LOG_FILE_PREFIX + timeInMillis);
-    SequenceFile.Metadata metadata = new SequenceFile.Metadata();
-    if (withMarker) {
-      metadata.set(new Text(TxConstants.TransactionLog.VERSION_KEY),
-                   new Text(Byte.toString(TxConstants.TransactionLog.CURRENT_VERSION)));
-    }
-    return SequenceFile.createWriter(fs, configuration, newLog, LongWritable.class,
-                                     TransactionEdit.class, SequenceFile.CompressionType.NONE, null, null, metadata);
-  }
-  private void writeNumWrites(SequenceFile.Writer writer, final int size) throws Exception {
-    String key = TxConstants.TransactionLog.NUM_ENTRIES_APPENDED;
-    CommitMarkerCodec.writeMarker(writer, size);
-  }
-  private void testTransactionLogSync(int totalCount, int batchSize, boolean withMarker, boolean isComplete)
-    throws Exception {
-    List<TransactionEdit> edits = TransactionEditUtil.createRandomEdits(totalCount);
-    long timestamp = System.currentTimeMillis();
-    Configuration configuration = getConfiguration();
-    FileSystem fs = FileSystem.newInstance(FileSystem.getDefaultUri(configuration), configuration);
-    SequenceFile.Writer writer = getSequenceFileWriter(configuration, fs, timestamp, withMarker);
-    AtomicLong logSequence = new AtomicLong();
-    HDFSTransactionLog transactionLog = getHDFSTransactionLog(configuration, fs, timestamp);
-    AbstractTransactionLog.Entry entry;
-    for (int i = 0; i < totalCount - batchSize; i += batchSize) {
-      if (withMarker) {
-        writeNumWrites(writer, batchSize);
-      }
-      for (int j = 0; j < batchSize; j++) {
-        entry = new AbstractTransactionLog.Entry(new LongWritable(logSequence.getAndIncrement()), edits.get(j));
-        writer.append(entry.getKey(), entry.getEdit());
-      }
-      writer.syncFs();
-    }
-    if (withMarker) {
-      writeNumWrites(writer, batchSize);
-    }
-    for (int i = totalCount - batchSize; i < totalCount - 1; i++) {
-      entry = new AbstractTransactionLog.Entry(new LongWritable(logSequence.getAndIncrement()), edits.get(i));
-      writer.append(entry.getKey(), entry.getEdit());
-    }
-    entry = new AbstractTransactionLog.Entry(new LongWritable(logSequence.getAndIncrement()),
-                                             edits.get(totalCount - 1));
-    if (isComplete) {
-      writer.append(entry.getKey(), entry.getEdit());
-    } else {
-      byte[] bytes = Longs.toByteArray(entry.getKey().get());
-      writer.appendRaw(bytes, 0, bytes.length, new SequenceFile.ValueBytes() {
-        @Override
-        public void writeUncompressedBytes(DataOutputStream outStream) throws IOException {
-          byte[] test = new byte[]{0x2};
-          outStream.write(test, 0, 1);
-        }
-        @Override
-        public void writeCompressedBytes(DataOutputStream outStream) throws IllegalArgumentException, IOException {
-          // no-op
-        }
-        @Override
-        public int getSize() {
-          // mimic size longer than the actual byte array size written, so we would reach EOF
-          return 12;
-        }
-      });
-    }
-    writer.syncFs();
-    Closeables.closeQuietly(writer);
-    // now let's try to read this log
-    TransactionLogReader reader = transactionLog.getReader();
-    int syncedEdits = 0;
-    while ( != null) {
-      // testing reading the transaction edits
-      syncedEdits++;
-    }
-    if (isComplete) {
-      Assert.assertEquals(totalCount, syncedEdits);
-    } else {
-      Assert.assertEquals(totalCount - batchSize, syncedEdits);
-    }
-  }
-  @Test
-  public void testTransactionLogNewVersion() throws Exception {
-    // in-complete sync
-    testTransactionLogSync(1000, 1, true, false);
-    testTransactionLogSync(2000, 5, true, false);
-    // complete sync
-    testTransactionLogSync(1000, 1, true, true);
-    testTransactionLogSync(2000, 5, true, true);
-  }
-  @Test
-  public void testTransactionLogOldVersion() throws Exception {
-    // in-complete sync
-    testTransactionLogSync(1000, 1, false, false);
-    // complete sync
-    testTransactionLogSync(2000, 5, false, true);
-  }
diff --git a/tephra-core/src/test/java/co/cask/tephra/persist/ b/tephra-core/src/test/java/co/cask/tephra/persist/
deleted file mode 100644
index 30ce455..0000000
--- a/tephra-core/src/test/java/co/cask/tephra/persist/
+++ /dev/null
@@ -1,72 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package co.cask.tephra.persist;
-import co.cask.tephra.TxConstants;
-import co.cask.tephra.metrics.TxMetricsCollector;
-import co.cask.tephra.snapshot.SnapshotCodecProvider;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.rules.TemporaryFolder;
- * Tests persistence of transaction snapshots and write-ahead logs to HDFS storage, using the
- * {@link HDFSTransactionStateStorage} and {@link HDFSTransactionLog} implementations.
- */
-public class HDFSTransactionStateStorageTest extends AbstractTransactionStateStorageTest {
-  @ClassRule
-  public static TemporaryFolder tmpFolder = new TemporaryFolder();
-  private static MiniDFSCluster dfsCluster;
-  private static Configuration conf;
-  @BeforeClass
-  public static void setupBeforeClass() throws Exception {
-    Configuration hConf = new Configuration();
-    hConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tmpFolder.newFolder().getAbsolutePath());
-    dfsCluster = new MiniDFSCluster.Builder(hConf).numDataNodes(1).build();
-    conf = new Configuration(dfsCluster.getFileSystem().getConf());
-  }
-  @AfterClass
-  public static void tearDownAfterClass() throws Exception {
-    dfsCluster.shutdown();
-  }
-  @Override
-  protected Configuration getConfiguration(String testName) throws IOException {
-    // tests should use the current user for HDFS
-    conf.unset(TxConstants.Manager.CFG_TX_HDFS_USER);
-    conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath());
-    return conf;
-  }
-  @Override
-  protected AbstractTransactionStateStorage getStorage(Configuration conf) {
-    return new HDFSTransactionStateStorage(conf, new SnapshotCodecProvider(conf), new TxMetricsCollector());
-  }
diff --git a/tephra-core/src/test/java/co/cask/tephra/persist/ b/tephra-core/src/test/java/co/cask/tephra/persist/
deleted file mode 100644
index 1b69c7a..0000000
--- a/tephra-core/src/test/java/co/cask/tephra/persist/
+++ /dev/null
@@ -1,200 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package co.cask.tephra.persist;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-import javax.annotation.Nullable;
- * Stores the latest transaction snapshot and logs in memory.
- */
-public class InMemoryTransactionStateStorage extends AbstractIdleService implements TransactionStateStorage {
-  // only keeps the most recent snapshot in memory
-  private TransactionSnapshot lastSnapshot;
-  private NavigableMap<Long, TransactionLog> logs = new TreeMap<>();
-  @Override
-  protected void startUp() throws Exception {
-  }
-  @Override
-  protected void shutDown() throws Exception {
-    lastSnapshot = null;
-    logs = new TreeMap<>();
-  }
-  @Override
-  public void writeSnapshot(OutputStream out, TransactionSnapshot snapshot) throws IOException {
-    // no codecs in in-memory mode
-  }
-  @Override
-  public void writeSnapshot(TransactionSnapshot snapshot) throws IOException {
-    lastSnapshot = snapshot;
-  }
-  @Override
-  public TransactionSnapshot getLatestSnapshot() throws IOException {
-    return lastSnapshot;
-  }
-  @Override
-  public TransactionVisibilityState getLatestTransactionVisibilityState() throws IOException {
-    return lastSnapshot;
-  }
-  @Override
-  public long deleteOldSnapshots(int numberToKeep) throws IOException {
-    // always only keep the last snapshot
-    return lastSnapshot.getTimestamp();
-  }
-  @Override
-  public List<String> listSnapshots() throws IOException {
-    List<String> snapshots = Lists.newArrayListWithCapacity(1);
-    if (lastSnapshot != null) {
-      snapshots.add(Long.toString(lastSnapshot.getTimestamp()));
-    }
-    return snapshots;
-  }
-  @Override
-  public List<TransactionLog> getLogsSince(long timestamp) throws IOException {
-    return Lists.newArrayList(logs.tailMap(timestamp).values());
-  }
-  @Override
-  public TransactionLog createLog(long timestamp) throws IOException {
-    TransactionLog log = new InMemoryTransactionLog(timestamp);
-    logs.put(timestamp, log);
-    return log;
-  }
-  @Override
-  public void deleteLogsOlderThan(long timestamp) throws IOException {
-    Iterator<Map.Entry<Long, TransactionLog>> logIter = logs.entrySet().iterator();
-    while (logIter.hasNext()) {
-      Map.Entry<Long, TransactionLog> logEntry =;
-      if (logEntry.getKey() < timestamp) {
-        logIter.remove();
-      }
-    }
-  }
-  @Override
-  public void setupStorage() throws IOException {
-  }
-  @Override
-  public List<String> listLogs() throws IOException {
-    return Lists.transform(Lists.newArrayList(logs.keySet()), new Function<Long, String>() {
-      @Nullable
-      @Override
-      public String apply(@Nullable Long input) {
-        return input.toString();
-      }
-    });
-  }
-  @Override
-  public String getLocation() {
-    return "in-memory";
-  }
-  public static class InMemoryTransactionLog implements TransactionLog {
-    private long timestamp;
-    private List<TransactionEdit> edits = Lists.newArrayList();
-    boolean isClosed = false;
-    public InMemoryTransactionLog(long timestamp) {
-      this.timestamp = timestamp;
-    }
-    @Override
-    public String getName() {
-      return "in-memory@" + timestamp;
-    }
-    @Override
-    public long getTimestamp() {
-      return timestamp;
-    }
-    @Override
-    public void append(TransactionEdit edit) throws IOException {
-      if (isClosed) {
-        throw new IOException("Log is closed");
-      }
-      edits.add(edit);
-    }
-    @Override
-    public void append(List<TransactionEdit> edits) throws IOException {
-      if (isClosed) {
-        throw new IOException("Log is closed");
-      }
-      edits.addAll(edits);
-    }
-    @Override
-    public void close() {
-      isClosed = true;
-    }
-    @Override
-    public TransactionLogReader getReader() throws IOException {
-      return new InMemoryLogReader(edits.iterator());
-    }
-  }
-  public static class InMemoryLogReader implements TransactionLogReader {
-    private final Iterator<TransactionEdit> editIterator;
-    public InMemoryLogReader(Iterator<TransactionEdit> editIterator) {
-      this.editIterator = editIterator;
-    }
-    @Override
-    public TransactionEdit next() throws IOException {
-      if (editIterator.hasNext()) {
-        return;
-      }
-      return null;
-    }
-    @Override
-    public TransactionEdit next(TransactionEdit reuse) throws IOException {
-      return next();
-    }
-    @Override
-    public void close() throws IOException {
-    }
-  }
diff --git a/tephra-core/src/test/java/co/cask/tephra/persist/ b/tephra-core/src/test/java/co/cask/tephra/persist/
deleted file mode 100644
index e2886ae..0000000
--- a/tephra-core/src/test/java/co/cask/tephra/persist/
+++ /dev/null
@@ -1,220 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package co.cask.tephra.persist;
-import co.cask.tephra.ChangeId;
-import co.cask.tephra.TransactionManager;
-import co.cask.tephra.TransactionType;
-import co.cask.tephra.TxConstants;
-import co.cask.tephra.metrics.TxMetricsCollector;
-import co.cask.tephra.snapshot.DefaultSnapshotCodec;
-import co.cask.tephra.snapshot.SnapshotCodecProvider;
-import co.cask.tephra.snapshot.SnapshotCodecV4;
-import org.apache.hadoop.conf.Configuration;
-import org.junit.Assert;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import java.util.Collection;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
- * Runs transaction persistence tests against the {@link LocalFileTransactionStateStorage} and
- * {@link LocalFileTransactionLog} implementations.
- */
-public class LocalTransactionStateStorageTest extends AbstractTransactionStateStorageTest {
-  @ClassRule
-  public static TemporaryFolder tmpDir = new TemporaryFolder();
-  @Override
-  protected Configuration getConfiguration(String testName) throws IOException {
-    File testDir = tmpDir.newFolder(testName);
-    Configuration conf = new Configuration();
-    conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_LOCAL_DIR, testDir.getAbsolutePath());
-    conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, SnapshotCodecV4.class.getName());
-    return conf;
-  }
-  @Override
-  protected AbstractTransactionStateStorage getStorage(Configuration conf) {
-    return new LocalFileTransactionStateStorage(conf, new SnapshotCodecProvider(conf), new TxMetricsCollector());
-  }
-  // v2 TransactionEdit
-  @SuppressWarnings("deprecation")
-  private class TransactionEditV2 extends TransactionEdit {
-    public TransactionEditV2(long writePointer, long visibilityUpperBound, State state, long expirationDate,
-                             Set<ChangeId> changes, long commitPointer, boolean canCommit, TransactionType type) {
-      super(writePointer, visibilityUpperBound, state, expirationDate, changes, commitPointer, canCommit, type, 
-            null, 0L, 0L, null);
-    }
-    @Override
-    public void write(DataOutput out) throws IOException {
-      TransactionEditCodecs.encode(this, out, new TransactionEditCodecs.TransactionEditCodecV2());
-    }
-  }
-  // Note: this test cannot run in AbstractTransactionStateStorageTest, since SequenceFile throws exception saying
-  // TransactionEditV2 is not TransactionEdit. Since the code path this test is verifying is the same path between
-  // HDFS and Local Storage, having this only over here is fine.
-  @SuppressWarnings("deprecation")
-  @Test
-  public void testLongTxnBackwardsCompatibility() throws Exception {
-    Configuration conf = getConfiguration("testLongTxnBackwardsCompatibility");
-    // Use SnapshotCodec version 1
-    String latestSnapshotCodec = conf.get(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES);
-    conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, DefaultSnapshotCodec.class.getName());
-    TransactionStateStorage storage = null;
-    try {
-      storage = getStorage(conf);
-      storage.startAndWait();
-      // Create transaction snapshot and transaction edits with version when long running txns had -1 expiration.
-      Collection<Long> invalid = Lists.newArrayList();
-      NavigableMap<Long, TransactionManager.InProgressTx> inProgress = Maps.newTreeMap();
-      long time1 = System.currentTimeMillis();
-      long wp1 = time1 * TxConstants.MAX_TX_PER_MS;
-      inProgress.put(wp1, new TransactionManager.InProgressTx(wp1 - 5, -1L));
-      long time2 = time1 + 100;
-      long wp2 = time2 * TxConstants.MAX_TX_PER_MS;
-      inProgress.put(wp2, new TransactionManager.InProgressTx(wp2 - 50, time2 + 1000));
-      Map<Long, Set<ChangeId>> committing = Maps.newHashMap();
-      Map<Long, Set<ChangeId>> committed = Maps.newHashMap();
-      TransactionSnapshot snapshot = new TransactionSnapshot(time2, 0, wp2, invalid,
-                                                             inProgress, committing, committed);
-      long time3 = time1 + 200;
-      long wp3 = time3 * TxConstants.MAX_TX_PER_MS;
-      TransactionEdit edit1 = new TransactionEditV2(wp3, wp3 - 10, TransactionEdit.State.INPROGRESS, -1L,
-                                                    null, 0L, false, null);
-      long time4 = time1 + 300;
-      long wp4 = time4  * TxConstants.MAX_TX_PER_MS;
-      TransactionEdit edit2 = new TransactionEditV2(wp4, wp4 - 10, TransactionEdit.State.INPROGRESS, time4 + 1000,
-                                                    null, 0L, false, null);
-      // write snapshot and transaction edit
-      storage.writeSnapshot(snapshot);
-      TransactionLog log = storage.createLog(time2);
-      log.append(edit1);
-      log.append(edit2);
-      log.close();
-      // Start transaction manager
-      conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, latestSnapshotCodec);
-      long longTimeout = TimeUnit.SECONDS.toMillis(conf.getLong(TxConstants.Manager.CFG_TX_LONG_TIMEOUT,
-                                                                TxConstants.Manager.DEFAULT_TX_LONG_TIMEOUT));
-      TransactionManager txm = new TransactionManager(conf, storage, new TxMetricsCollector());
-      txm.startAndWait();
-      try {
-        // Verify that the txns in old format were read correctly.
-        // There should be four in-progress transactions, and no invalid transactions
-        TransactionSnapshot snapshot1 = txm.getCurrentState();
-        Assert.assertEquals(ImmutableSortedSet.of(wp1, wp2, wp3, wp4), snapshot1.getInProgress().keySet());
-        verifyInProgress(snapshot1.getInProgress().get(wp1), TransactionType.LONG, time1 + longTimeout);
-        verifyInProgress(snapshot1.getInProgress().get(wp2), TransactionType.SHORT, time2 + 1000);
-        verifyInProgress(snapshot1.getInProgress().get(wp3), TransactionType.LONG, time3 + longTimeout);
-        verifyInProgress(snapshot1.getInProgress().get(wp4), TransactionType.SHORT, time4 + 1000);
-        Assert.assertEquals(0, snapshot1.getInvalid().size());
-      } finally {
-        txm.stopAndWait();
-      }
-    } finally {
-      if (storage != null) {
-        storage.stopAndWait();
-      }
-    }
-  }
-  // Note: this test cannot run in AbstractTransactionStateStorageTest, since SequenceFile throws exception saying
-  // TransactionEditV2 is not TransactionEdit. Since the code path this test is verifying is the same path between
-  // HDFS and Local Storage, having this only over here is fine.
-  @SuppressWarnings("deprecation")
-  @Test
-  public void testAbortEditBackwardsCompatibility() throws Exception {
-    Configuration conf = getConfiguration("testAbortEditBackwardsCompatibility");
-    TransactionStateStorage storage = null;
-    try {
-      storage = getStorage(conf);
-      storage.startAndWait();
-      // Create edits for transaction type addition to abort
-      long time1 = System.currentTimeMillis();
-      long wp1 = time1 * TxConstants.MAX_TX_PER_MS;
-      TransactionEdit edit1 = new TransactionEditV2(wp1, wp1 - 10, TransactionEdit.State.INPROGRESS, -1L,
-                                                    null, 0L, false, null);
-      TransactionEdit edit2 = new TransactionEditV2(wp1, 0L, TransactionEdit.State.ABORTED, 0L,
-                                                    null, 0L, false, null);
-      long time2 = time1 + 400;
-      long wp2 = time2 * TxConstants.MAX_TX_PER_MS;
-      TransactionEdit edit3 = new TransactionEditV2(wp2, wp2 - 10, TransactionEdit.State.INPROGRESS, time2 + 10000,
-                                                    null, 0L, false, null);
-      TransactionEdit edit4 = new TransactionEditV2(wp2, 0L, TransactionEdit.State.INVALID, 0L, null, 0L, false, null);
-      // Simulate case where we cannot determine txn state during abort
-      TransactionEdit edit5 = new TransactionEditV2(wp2, 0L, TransactionEdit.State.ABORTED, 0L, null, 0L, false, null);
-      // write snapshot and transaction edit
-      TransactionLog log = storage.createLog(time1);
-      log.append(edit1);
-      log.append(edit2);
-      log.append(edit3);
-      log.append(edit4);
-      log.append(edit5);
-      log.close();
-      // Start transaction manager
-      TransactionManager txm = new TransactionManager(conf, storage, new TxMetricsCollector());
-      txm.startAndWait();
-      try {
-        // Verify that the txns in old format were read correctly.
-        // Both transactions should be in invalid state
-        TransactionSnapshot snapshot1 = txm.getCurrentState();
-        Assert.assertEquals(ImmutableList.of(wp1, wp2), snapshot1.getInvalid());
-        Assert.assertEquals(0, snapshot1.getInProgress().size());
-        Assert.assertEquals(0, snapshot1.getCommittedChangeSets().size());
-        Assert.assertEquals(0, snapshot1.getCommittingChangeSets().size());
-      } finally {
-        txm.stopAndWait();
-      }
-    } finally {
-      if (storage != null) {
-        storage.stopAndWait();
-      }
-    }
-  }
-  private void verifyInProgress(TransactionManager.InProgressTx inProgressTx, TransactionType type,
-                                long expiration) throws Exception {
-    Assert.assertEquals(type, inProgressTx.getType());
-    Assert.assertTrue(inProgressTx.getExpiration() == expiration);
-  }