You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by po...@apache.org on 2016/05/11 20:15:49 UTC

[49/56] [abbrv] [partial] incubator-tephra git commit: Rename package to org.apache.tephra

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/TransactionManager.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/TransactionManager.java b/tephra-core/src/main/java/co/cask/tephra/TransactionManager.java
deleted file mode 100644
index 3f51956..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/TransactionManager.java
+++ /dev/null
@@ -1,1398 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra;
-
-import co.cask.tephra.metrics.DefaultMetricsCollector;
-import co.cask.tephra.metrics.MetricsCollector;
-import co.cask.tephra.persist.NoOpTransactionStateStorage;
-import co.cask.tephra.persist.TransactionEdit;
-import co.cask.tephra.persist.TransactionLog;
-import co.cask.tephra.persist.TransactionLogReader;
-import co.cask.tephra.persist.TransactionSnapshot;
-import co.cask.tephra.persist.TransactionStateStorage;
-import co.cask.tephra.snapshot.SnapshotCodecProvider;
-import co.cask.tephra.util.TxUtils;
-import com.google.common.base.Objects;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Stopwatch;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.AbstractService;
-import com.google.inject.Inject;
-import it.unimi.dsi.fastutil.longs.LongArrayList;
-import org.apache.hadoop.conf.Configuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.Set;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-
-/**
- * This is the central place to manage all active transactions in the system.
- *
- * A transaction consists of
- * <ul>
- *   <li>A write pointer: This is the version used for all writes of that transaction.</li>
- *   <li>A read pointer: All reads under the transaction use this as an upper bound for the version.</li>
- *   <li>A set of excluded versions: These are the write versions of other transactions that must be excluded from
- *   reads, because those transactions are still in progress, or they failed but couldn't be properly rolled back.</li>
- * </ul>
- * To use the transaction system, a client must follow this sequence of steps:
- * <ol>
- *   <li>Request a new transaction.</li>
- *   <li>Use the transaction to read and write datasets. Datasets are encouraged to cache the writes of the
- *     transaction in memory, to reduce the cost of rollback in case the transaction fails. </li>
- *   <li>Check whether the transaction has conflicts. For this, the set of change keys are submitted via canCommit(),
- *     and the transaction manager verifies that none of these keys are in conflict with other transactions that
- *     committed since the start of this transaction.</li>
- *   <li>If the transaction has conflicts:
- *   <ol>
- *     <li>Roll back the changes in every dataset that was changed. This can happen in-memory if the
- *       changes were cached.</li>
- *     <li>Abort the transaction to remove it from the active transactions.</li>
- *   </ol>
- *   <li>If the transaction has no conflicts:</li>
- *   <ol>
- *     <li>Persist all datasets changes to storage.</li>
- *     <li>Commit the transaction. This will repeat the conflict detection, because more overlapping transactions
- *       may have committed since the first conflict check.</li>
- *     <li>If the transaction has conflicts:</li>
- *     <ol>
- *       <li>Roll back the changes in every dataset that was changed. This is more expensive because
- *         changes must be undone in persistent storage.</li>
- *       <li>Abort the transaction to remove it from the active transactions.</li>
- *     </ol>
- *   </ol>
- * </ol>
- * Transactions may be short or long-running. A short transaction is started with a timeout, and if it is not
- * committed before that timeout, it is invalidated and excluded from future reads. A long-running transaction has
- * no timeout and will remain active until it is committed or aborted. Long transactions are typically used in
- * map/reduce jobs and can produce enormous amounts of changes. Therefore, long transactions do not participate in
- * conflict detection (they would almost always have conflicts). We also assume that the changes of long transactions
- * are not tracked, and therefore cannot be rolled back. Hence, when a long transaction is aborted, it remains in the
- * list of excluded transactions to make its writes invisible.
- */
-public class TransactionManager extends AbstractService {
-  // todo: optimize heavily
-
-  private static final Logger LOG = LoggerFactory.getLogger(TransactionManager.class);
-
-  // poll every 1 second to check whether a snapshot is needed
-  private static final long SNAPSHOT_POLL_INTERVAL = 1000L;
-
-  //poll every 10 second to emit metrics
-  private static final long METRICS_POLL_INTERVAL = 10000L;
-
-  private static final long[] NO_INVALID_TX = { };
-
-  // Transactions that are in progress, with their info.
-  private final NavigableMap<Long, InProgressTx> inProgress = new ConcurrentSkipListMap<Long, InProgressTx>();
-
-  // the list of transactions that are invalid (not properly committed/aborted, or timed out)
-  // TODO: explain usage of two arrays
-  private final LongArrayList invalid = new LongArrayList();
-  private long[] invalidArray = NO_INVALID_TX;
-
-  // todo: use moving array instead (use Long2ObjectMap<byte[]> in fastutil)
-  // todo: should this be consolidated with inProgress?
-  // commit time next writePointer -> changes made by this tx
-  private final NavigableMap<Long, Set<ChangeId>> committedChangeSets =
-    new ConcurrentSkipListMap<Long, Set<ChangeId>>();
-  // not committed yet
-  private final Map<Long, Set<ChangeId>> committingChangeSets = Maps.newConcurrentMap();
-
-  private long readPointer;
-  private long lastWritePointer;
-  private MetricsCollector txMetricsCollector;
-
-  private final TransactionStateStorage persistor;
-
-  private final int cleanupInterval;
-  private final int defaultTimeout;
-  private final int defaultLongTimeout;
-  private DaemonThreadExecutor cleanupThread = null;
-
-  private volatile TransactionLog currentLog;
-
-  // timestamp of the last completed snapshot
-  private long lastSnapshotTime;
-  // frequency in millis to perform snapshots
-  private final long snapshotFrequencyInSeconds;
-  // number of most recent snapshots to retain
-  private final int snapshotRetainCount;
-  private DaemonThreadExecutor snapshotThread;
-  private DaemonThreadExecutor metricsThread;
-
-  // lock guarding change of the current transaction log
-  private final ReentrantReadWriteLock logLock = new ReentrantReadWriteLock();
-  private final Lock logReadLock = logLock.readLock();
-  private final Lock logWriteLock = logLock.writeLock();
-
-  // fudge factor (in milliseconds) used when interpreting transactions as LONG based on expiration
-  // TODO: REMOVE WITH txnBackwardsCompatCheck()
-  private final long longTimeoutTolerance;
-
-  public TransactionManager(Configuration config) {
-    this(config, new NoOpTransactionStateStorage(new SnapshotCodecProvider(config)), new DefaultMetricsCollector());
-  }
-
-  @Inject
-  public TransactionManager(Configuration conf, @Nonnull TransactionStateStorage persistor,
-                            MetricsCollector txMetricsCollector) {
-    this.persistor = persistor;
-    cleanupInterval = conf.getInt(TxConstants.Manager.CFG_TX_CLEANUP_INTERVAL,
-                                  TxConstants.Manager.DEFAULT_TX_CLEANUP_INTERVAL);
-    defaultTimeout = conf.getInt(TxConstants.Manager.CFG_TX_TIMEOUT,
-                                 TxConstants.Manager.DEFAULT_TX_TIMEOUT);
-    defaultLongTimeout = conf.getInt(TxConstants.Manager.CFG_TX_LONG_TIMEOUT,
-                                 TxConstants.Manager.DEFAULT_TX_LONG_TIMEOUT);
-    snapshotFrequencyInSeconds = conf.getLong(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL,
-                                              TxConstants.Manager.DEFAULT_TX_SNAPSHOT_INTERVAL);
-    // must always keep at least 1 snapshot
-    snapshotRetainCount = Math.max(conf.getInt(TxConstants.Manager.CFG_TX_SNAPSHOT_RETAIN,
-                                               TxConstants.Manager.DEFAULT_TX_SNAPSHOT_RETAIN), 1);
-
-    // intentionally not using a constant, as this config should not be exposed
-    // TODO: REMOVE WITH txnBackwardsCompatCheck()
-    longTimeoutTolerance = conf.getLong("data.tx.long.timeout.tolerance", 10000);
-
-    //
-    this.txMetricsCollector = txMetricsCollector;
-    this.txMetricsCollector.configure(conf);
-    clear();
-  }
-
-  private void clear() {
-    invalid.clear();
-    invalidArray = NO_INVALID_TX;
-    inProgress.clear();
-    committedChangeSets.clear();
-    committingChangeSets.clear();
-    lastWritePointer = 0;
-    readPointer = 0;
-    lastSnapshotTime = 0;
-  }
-
-  private boolean isStopping() {
-    return State.STOPPING.equals(state());
-  }
-
-  @Override
-  public synchronized void doStart() {
-    LOG.info("Starting transaction manager.");
-    txMetricsCollector.start();
-    // start up the persistor
-    persistor.startAndWait();
-    try {
-      persistor.setupStorage();
-    } catch (IOException e) {
-      Throwables.propagate(e);
-    }
-    // establish defaults in case there is no persistence
-    clear();
-    // attempt to recover state from last run
-    recoverState();
-    // start the periodic cleanup thread
-    startCleanupThread();
-    startSnapshotThread();
-    startMetricsThread();
-    // initialize the WAL if we did not force a snapshot in recoverState()
-    initLog();
-    // initialize next write pointer if needed
-    if (lastWritePointer == 0) {
-      lastWritePointer = getNextWritePointer();
-      readPointer = lastWritePointer;
-    }
-
-    notifyStarted();
-  }
-
-  private void initLog() {
-    if (currentLog == null) {
-      try {
-        currentLog = persistor.createLog(System.currentTimeMillis());
-      } catch (IOException ioe) {
-        throw Throwables.propagate(ioe);
-      }
-    }
-  }
-
-  private void startCleanupThread() {
-    if (cleanupInterval <= 0 || defaultTimeout <= 0) {
-      return;
-    }
-    LOG.info("Starting periodic timed-out transaction cleanup every " + cleanupInterval +
-               " seconds with default timeout of " + defaultTimeout + " seconds.");
-    this.cleanupThread = new DaemonThreadExecutor("tx-clean-timeout") {
-      @Override
-      public void doRun() {
-        cleanupTimedOutTransactions();
-      }
-
-      @Override
-      public long getSleepMillis() {
-        return cleanupInterval * 1000;
-      }
-    };
-    cleanupThread.start();
-  }
-
-  private void startSnapshotThread() {
-    if (snapshotFrequencyInSeconds > 0) {
-      LOG.info("Starting periodic snapshot thread, frequency = " + snapshotFrequencyInSeconds +
-          " seconds, location = " + persistor.getLocation());
-      this.snapshotThread = new DaemonThreadExecutor("tx-snapshot") {
-        @Override
-        public void doRun() {
-          long currentTime = System.currentTimeMillis();
-          if (lastSnapshotTime < (currentTime - snapshotFrequencyInSeconds * 1000)) {
-            try {
-              doSnapshot(false);
-            } catch (IOException ioe) {
-              LOG.error("Periodic snapshot failed!", ioe);
-            }
-          }
-        }
-
-        @Override
-        protected void onShutdown() {
-          // perform a final snapshot
-          try {
-            LOG.info("Writing final snapshot prior to shutdown");
-            doSnapshot(true);
-          } catch (IOException ioe) {
-            LOG.error("Failed performing final snapshot on shutdown", ioe);
-          }
-        }
-
-        @Override
-        public long getSleepMillis() {
-          return SNAPSHOT_POLL_INTERVAL;
-        }
-      };
-      snapshotThread.start();
-    }
-  }
-
-  // Emits Transaction Data structures size as metrics
-  private void startMetricsThread() {
-    LOG.info("Starting periodic Metrics Emitter thread, frequency = " + METRICS_POLL_INTERVAL);
-    this.metricsThread = new DaemonThreadExecutor("tx-metrics") {
-      @Override
-      public void doRun() {
-        txMetricsCollector.gauge("committing.size", committingChangeSets.size());
-        txMetricsCollector.gauge("committed.size", committedChangeSets.size());
-        txMetricsCollector.gauge("inprogress.size", inProgress.size());
-        txMetricsCollector.gauge("invalid.size", invalidArray.length);
-      }
-
-      @Override
-      protected void onShutdown() {
-        // perform a final metrics emit
-        txMetricsCollector.gauge("committing.size", committingChangeSets.size());
-        txMetricsCollector.gauge("committed.size", committedChangeSets.size());
-        txMetricsCollector.gauge("inprogress.size", inProgress.size());
-        txMetricsCollector.gauge("invalid.size", invalidArray.length);
-      }
-
-      @Override
-      public long getSleepMillis() {
-        return METRICS_POLL_INTERVAL;
-      }
-    };
-    metricsThread.start();
-  }
-
-  private void cleanupTimedOutTransactions() {
-    List<TransactionEdit> invalidEdits = null;
-    this.logReadLock.lock();
-    try {
-      synchronized (this) {
-        if (!isRunning()) {
-          return;
-        }
-
-        long currentTime = System.currentTimeMillis();
-        List<Long> timedOut = Lists.newArrayList();
-        for (Map.Entry<Long, InProgressTx> tx : inProgress.entrySet()) {
-          long expiration = tx.getValue().getExpiration();
-          if (expiration >= 0L && currentTime > expiration) {
-            // timed out, remember tx id (can't remove while iterating over entries)
-            timedOut.add(tx.getKey());
-            LOG.info("Tx invalid list: added tx {} because of timeout", tx.getKey());
-          } else if (expiration < 0) {
-            LOG.warn("Transaction {} has negative expiration time {}. Likely cause is the transaction was not " +
-                       "migrated correctly, this transaction will be expired immediately",
-                     tx.getKey(), expiration);
-            timedOut.add(tx.getKey());
-          }
-        }
-        if (!timedOut.isEmpty()) {
-          invalidEdits = Lists.newArrayListWithCapacity(timedOut.size());
-          invalid.addAll(timedOut);
-          for (long tx : timedOut) {
-            committingChangeSets.remove(tx);
-            inProgress.remove(tx);
-            invalidEdits.add(TransactionEdit.createInvalid(tx));
-          }
-
-          // todo: find a more efficient way to keep this sorted. Could it just be an array?
-          Collections.sort(invalid);
-          invalidArray = invalid.toLongArray();
-          LOG.info("Invalidated {} transactions due to timeout.", timedOut.size());
-        }
-      }
-      if (invalidEdits != null) {
-          appendToLog(invalidEdits);
-      }
-    } finally {
-      this.logReadLock.unlock();
-    }
-  }
-
-  public synchronized TransactionSnapshot getSnapshot() throws IOException {
-    TransactionSnapshot snapshot = null;
-    if (!isRunning() && !isStopping()) {
-      return null;
-    }
-
-    long now = System.currentTimeMillis();
-    // avoid duplicate snapshots at same timestamp
-    if (now == lastSnapshotTime || (currentLog != null && now == currentLog.getTimestamp())) {
-      try {
-        TimeUnit.MILLISECONDS.sleep(1);
-      } catch (InterruptedException ie) { }
-    }
-    // copy in memory state
-    snapshot = getCurrentState();
-
-    LOG.debug("Starting snapshot of transaction state with timestamp {}", snapshot.getTimestamp());
-    LOG.debug("Returning snapshot of state: " + snapshot);
-    return snapshot;
-  }
-
-  /**
-   * Take a snapshot of the transaction state and serialize it into the given output stream.
-   * @return whether a snapshot was taken.
-   */
-  public boolean takeSnapshot(OutputStream out) throws IOException {
-    TransactionSnapshot snapshot = getSnapshot();
-    if (snapshot != null) {
-      persistor.writeSnapshot(out, snapshot);
-      return true;
-    } else {
-      return false;
-    }
-  }
-
-  private void doSnapshot(boolean closing) throws IOException {
-    long snapshotTime = 0L;
-    TransactionSnapshot snapshot = null;
-    TransactionLog oldLog = null;
-    try {
-      this.logWriteLock.lock();
-      try {
-        synchronized (this) {
-          snapshot = getSnapshot();
-          if (snapshot == null && !closing) {
-            return;
-          }
-          if (snapshot != null) {
-            snapshotTime = snapshot.getTimestamp();
-          }
-
-          // roll WAL
-          oldLog = currentLog;
-          if (!closing) {
-            currentLog = persistor.createLog(snapshot.getTimestamp());
-          }
-        }
-        // there may not be an existing log on startup
-        if (oldLog != null) {
-          oldLog.close();
-        }
-      } finally {
-        this.logWriteLock.unlock();
-      }
-
-      // save snapshot
-      if (snapshot != null) {
-        persistor.writeSnapshot(snapshot);
-        lastSnapshotTime = snapshotTime;
-
-        // clean any obsoleted snapshots and WALs
-        long oldestRetainedTimestamp = persistor.deleteOldSnapshots(snapshotRetainCount);
-        persistor.deleteLogsOlderThan(oldestRetainedTimestamp);
-      }
-    } catch (IOException ioe) {
-      abortService("Snapshot (timestamp " + snapshotTime + ") failed due to: " + ioe.getMessage(), ioe);
-    }
-  }
-
-  public synchronized TransactionSnapshot getCurrentState() {
-    return TransactionSnapshot.copyFrom(System.currentTimeMillis(), readPointer, lastWritePointer,
-                                        invalid, inProgress, committingChangeSets, committedChangeSets);
-  }
-
-  public synchronized void recoverState() {
-    try {
-      TransactionSnapshot lastSnapshot = persistor.getLatestSnapshot();
-      // if we failed before a snapshot could complete, we might not have one to restore
-      if (lastSnapshot != null) {
-        restoreSnapshot(lastSnapshot);
-      }
-      // replay any WALs since the last snapshot
-      Collection<TransactionLog> logs = persistor.getLogsSince(lastSnapshotTime);
-      if (logs != null) {
-        replayLogs(logs);
-      }
-    } catch (IOException e) {
-      LOG.error("Unable to read back transaction state:", e);
-      throw Throwables.propagate(e);
-    }
-  }
-
-  /**
-   * Restore the initial in-memory transaction state from a snapshot.
-   */
-  private void restoreSnapshot(TransactionSnapshot snapshot) {
-    LOG.info("Restoring transaction state from snapshot at " + snapshot.getTimestamp());
-    Preconditions.checkState(lastSnapshotTime == 0, "lastSnapshotTime has been set!");
-    Preconditions.checkState(readPointer == 0, "readPointer has been set!");
-    Preconditions.checkState(lastWritePointer == 0, "lastWritePointer has been set!");
-    Preconditions.checkState(invalid.isEmpty(), "invalid list should be empty!");
-    Preconditions.checkState(inProgress.isEmpty(), "inProgress map should be empty!");
-    Preconditions.checkState(committingChangeSets.isEmpty(), "committingChangeSets should be empty!");
-    Preconditions.checkState(committedChangeSets.isEmpty(), "committedChangeSets should be empty!");
-    LOG.info("Restoring snapshot of state: " + snapshot);
-
-    lastSnapshotTime = snapshot.getTimestamp();
-    readPointer = snapshot.getReadPointer();
-    lastWritePointer = snapshot.getWritePointer();
-    invalid.addAll(snapshot.getInvalid());
-    inProgress.putAll(txnBackwardsCompatCheck(defaultLongTimeout, longTimeoutTolerance, snapshot.getInProgress()));
-    committingChangeSets.putAll(snapshot.getCommittingChangeSets());
-    committedChangeSets.putAll(snapshot.getCommittedChangeSets());
-  }
-
-  /**
-   * Check if in-progress transactions need to be migrated to have expiration time and type, if so do the migration.
-   * This is required for backwards compatibility, when long running transactions were represented
-   * with expiration time -1. This can be removed when we stop supporting SnapshotCodec version 1.
-   */
-  public static Map<Long, InProgressTx> txnBackwardsCompatCheck(int defaultLongTimeout, long longTimeoutTolerance,
-                                                                Map<Long, InProgressTx> inProgress) {
-    for (Map.Entry<Long, InProgressTx> entry : inProgress.entrySet()) {
-      long writePointer = entry.getKey();
-      long expiration = entry.getValue().getExpiration();
-      // LONG transactions will either have a negative expiration or expiration set to the long timeout
-      // use a fudge factor on the expiration check, since expiraton is set based on system time, not the write pointer
-      if (entry.getValue().getType() == null &&
-          (expiration < 0 ||
-              (getTxExpirationFromWritePointer(writePointer, defaultLongTimeout) - expiration
-                  < longTimeoutTolerance))) {
-        // handle null expiration
-        long newExpiration = getTxExpirationFromWritePointer(writePointer, defaultLongTimeout);
-        InProgressTx compatTx =
-          new InProgressTx(entry.getValue().getVisibilityUpperBound(), newExpiration, TransactionType.LONG,
-              entry.getValue().getCheckpointWritePointers());
-        entry.setValue(compatTx);
-      } else if (entry.getValue().getType() == null) {
-        InProgressTx compatTx =
-          new InProgressTx(entry.getValue().getVisibilityUpperBound(), entry.getValue().getExpiration(),
-                           TransactionType.SHORT, entry.getValue().getCheckpointWritePointers());
-        entry.setValue(compatTx);
-      }
-    }
-    return inProgress;
-  }
-
-  /**
-   * Resets the state of the transaction manager.
-   */
-  public void resetState() {
-    this.logWriteLock.lock();
-    try {
-      // Take a snapshot before resetting the state, for debugging purposes
-      doSnapshot(false);
-      // Clear the state
-      clear();
-      // Take another snapshot: if no snapshot is taken after clearing the state
-      // and the manager is restarted, we will recover from the snapshot taken
-      // before resetting the state, which would be really bad
-      // This call will also init a new WAL
-      doSnapshot(false);
-    } catch (IOException e) {
-      LOG.error("Snapshot failed when resetting state!", e);
-      e.printStackTrace();
-    } finally {
-      this.logWriteLock.unlock();
-    }
-  }
-
-  /**
-   * Replay all logged edits from the given transaction logs.
-   */
-  private void replayLogs(Collection<TransactionLog> logs) {
-    for (TransactionLog log : logs) {
-      LOG.info("Replaying edits from transaction log " + log.getName());
-      int editCnt = 0;
-      try {
-        TransactionLogReader reader = log.getReader();
-        // reader may be null in the case of an empty file
-        if (reader == null) {
-          continue;
-        }
-        TransactionEdit edit = null;
-        while ((edit = reader.next()) != null) {
-          editCnt++;
-          switch (edit.getState()) {
-            case INPROGRESS:
-              long expiration = edit.getExpiration();
-              TransactionType type = edit.getType();
-              // Check if transaction needs to be migrated to have expiration and type. Previous version of 
-              // long running transactions were represented with expiration time as -1.
-              // This can be removed when we stop supporting TransactionEditCodecV2.
-              if (expiration < 0) {
-                expiration = getTxExpirationFromWritePointer(edit.getWritePointer(), defaultLongTimeout);
-                type = TransactionType.LONG;
-              } else if (type == null) {
-                type = TransactionType.SHORT;
-              }
-              addInProgressAndAdvance(edit.getWritePointer(), edit.getVisibilityUpperBound(),
-                                      expiration, type);
-              break;
-            case COMMITTING:
-              addCommittingChangeSet(edit.getWritePointer(), edit.getChanges());
-              break;
-            case COMMITTED:
-              // TODO: need to reconcile usage of transaction id v/s write pointer TEPHRA-140
-              long transactionId = edit.getWritePointer();
-              long[] checkpointPointers = edit.getCheckpointPointers();
-              long writePointer = checkpointPointers == null || checkpointPointers.length == 0 ?
-                transactionId : checkpointPointers[checkpointPointers.length - 1];
-              doCommit(transactionId, writePointer, edit.getChanges(),
-                       edit.getCommitPointer(), edit.getCanCommit());
-              break;
-            case INVALID:
-              doInvalidate(edit.getWritePointer());
-              break;
-            case ABORTED:
-              type = edit.getType();
-              // Check if transaction edit needs to be migrated to have type. Previous versions of
-              // ABORTED edits did not contain type.
-              // This can be removed when we stop supporting TransactionEditCodecV2.
-              if (type == null) {
-                InProgressTx inProgressTx = inProgress.get(edit.getWritePointer());
-                if (inProgressTx != null) {
-                  type = inProgressTx.getType();
-                } else {
-                  // If transaction is not in-progress, then it has either been already aborted or invalidated.
-                  // We cannot determine the transaction's state based on current information, to be safe invalidate it.
-                  LOG.warn("Invalidating transaction {} as it's type cannot be determined during replay",
-                           edit.getWritePointer());
-                  doInvalidate(edit.getWritePointer());
-                  break;
-                }
-              }
-              doAbort(edit.getWritePointer(), edit.getCheckpointPointers(), type);
-              break;
-            case TRUNCATE_INVALID_TX:
-              if (edit.getTruncateInvalidTxTime() != 0) {
-                doTruncateInvalidTxBefore(edit.getTruncateInvalidTxTime());
-              } else {
-                doTruncateInvalidTx(edit.getTruncateInvalidTx());
-              }
-              break;
-            case CHECKPOINT:
-              doCheckpoint(edit.getWritePointer(), edit.getParentWritePointer());
-              break;
-            default:
-              // unknown type!
-              throw new IllegalArgumentException("Invalid state for WAL entry: " + edit.getState());
-          }
-        }
-      } catch (IOException ioe) {
-        throw Throwables.propagate(ioe);
-      } catch (InvalidTruncateTimeException e) {
-        throw Throwables.propagate(e);
-      }
-      LOG.info("Read " + editCnt + " edits from log " + log.getName());
-    }
-  }
-
-  @Override
-  public void doStop() {
-    Stopwatch timer = new Stopwatch().start();
-    LOG.info("Shutting down gracefully...");
-    // signal the cleanup thread to stop
-    if (cleanupThread != null) {
-      cleanupThread.shutdown();
-      try {
-        cleanupThread.join(30000L);
-      } catch (InterruptedException ie) {
-        LOG.warn("Interrupted waiting for cleanup thread to stop");
-        Thread.currentThread().interrupt();
-      }
-    }
-    if (metricsThread != null) {
-      metricsThread.shutdown();
-      try {
-        metricsThread.join(30000L);
-      } catch (InterruptedException ie) {
-        LOG.warn("Interrupted waiting for cleanup thread to stop");
-        Thread.currentThread().interrupt();
-      }
-    }
-    if (snapshotThread != null) {
-      // this will trigger a final snapshot on stop
-      snapshotThread.shutdown();
-      try {
-        snapshotThread.join(30000L);
-      } catch (InterruptedException ie) {
-        LOG.warn("Interrupted waiting for snapshot thread to stop");
-        Thread.currentThread().interrupt();
-      }
-    }
-
-    persistor.stopAndWait();
-    txMetricsCollector.stop();
-    timer.stop();
-    LOG.info("Took " + timer + " to stop");
-    notifyStopped();
-  }
-
-  /**
-   * Immediately shuts down the service, without going through the normal close process.
-   * @param message A message describing the source of the failure.
-   * @param error Any exception that caused the failure.
-   */
-  private void abortService(String message, Throwable error) {
-    if (isRunning()) {
-      LOG.error("Aborting transaction manager due to: " + message, error);
-      notifyFailed(error);
-    }
-  }
-
-  private void ensureAvailable() {
-    Preconditions.checkState(isRunning(), "Transaction Manager is not running.");
-  }
-
-  /**
-   * Start a short transaction with the default timeout.
-   */
-  public Transaction startShort() {
-    return startShort(defaultTimeout);
-  }
-
-  /**
-   * Start a short transaction with a given timeout.
-   * @param timeoutInSeconds the time out period in seconds.
-   */
-  public Transaction startShort(int timeoutInSeconds) {
-    Preconditions.checkArgument(timeoutInSeconds > 0, "timeout must be positive but is %s", timeoutInSeconds);
-    txMetricsCollector.rate("start.short");
-    Stopwatch timer = new Stopwatch().start();
-    long expiration = getTxExpiration(timeoutInSeconds);
-    Transaction tx = startTx(expiration, TransactionType.SHORT);
-    txMetricsCollector.histogram("start.short.latency", (int) timer.elapsedMillis());
-    return tx;
-  }
-  
-  private static long getTxExpiration(long timeoutInSeconds) {
-    long currentTime = System.currentTimeMillis();
-    return currentTime + TimeUnit.SECONDS.toMillis(timeoutInSeconds);
-  }
-
-  public static long getTxExpirationFromWritePointer(long writePointer, long timeoutInSeconds) {
-    return writePointer / TxConstants.MAX_TX_PER_MS + TimeUnit.SECONDS.toMillis(timeoutInSeconds);
-  }
-
-  private long getNextWritePointer() {
-    // We want to align tx ids with current time. We assume that tx ids are sequential, but not less than
-    // System.currentTimeMillis() * MAX_TX_PER_MS.
-    return Math.max(lastWritePointer + 1, System.currentTimeMillis() * TxConstants.MAX_TX_PER_MS);
-  }
-
-  /**
-   * Start a long transaction. Long transactions and do not participate in conflict detection. Also, aborting a long
-   * transaction moves it to the invalid list because we assume that its writes cannot be rolled back.
-   */
-  public Transaction startLong() {
-    txMetricsCollector.rate("start.long");
-    Stopwatch timer = new Stopwatch().start();
-    long expiration = getTxExpiration(defaultLongTimeout);
-    Transaction tx = startTx(expiration, TransactionType.LONG);
-    txMetricsCollector.histogram("start.long.latency", (int) timer.elapsedMillis());
-    return tx;
-  }
-
-  private Transaction startTx(long expiration, TransactionType type) {
-    Transaction tx = null;
-    long txid;
-    // guard against changes to the transaction log while processing
-    this.logReadLock.lock();
-    try {
-      synchronized (this) {
-        ensureAvailable();
-        txid = getNextWritePointer();
-        tx = createTransaction(txid, type);
-        addInProgressAndAdvance(tx.getTransactionId(), tx.getVisibilityUpperBound(), expiration, type);
-      }
-      // appending to WAL out of global lock for concurrent performance
-      // we should still be able to arrive at the same state even if log entries are out of order
-      appendToLog(TransactionEdit.createStarted(tx.getTransactionId(), tx.getVisibilityUpperBound(), expiration, type));
-    } finally {
-      this.logReadLock.unlock();
-    }
-    return tx;
-  }
-
-  private void addInProgressAndAdvance(long writePointer, long visibilityUpperBound,
-                                       long expiration, TransactionType type) {
-    inProgress.put(writePointer, new InProgressTx(visibilityUpperBound, expiration, type));
-    advanceWritePointer(writePointer);
-  }
-
-  private void advanceWritePointer(long writePointer) {
-    // don't move the write pointer back if we have out of order transaction log entries
-    if (writePointer > lastWritePointer) {
-      lastWritePointer = writePointer;
-    }
-  }
-
-  public boolean canCommit(Transaction tx, Collection<byte[]> changeIds) throws TransactionNotInProgressException {
-    txMetricsCollector.rate("canCommit");
-    Stopwatch timer = new Stopwatch().start();
-    if (inProgress.get(tx.getTransactionId()) == null) {
-      // invalid transaction, either this has timed out and moved to invalid, or something else is wrong.
-      if (invalid.contains(tx.getTransactionId())) {
-        throw new TransactionNotInProgressException(
-          String.format("canCommit() is called for transaction %d that is not in progress (it is known to be invalid)",
-                        tx.getTransactionId()));
-      } else {
-        throw new TransactionNotInProgressException(
-          String.format("canCommit() is called for transaction %d that is not in progress", tx.getTransactionId()));
-      }
-    }
-
-    Set<ChangeId> set = Sets.newHashSetWithExpectedSize(changeIds.size());
-    for (byte[] change : changeIds) {
-      set.add(new ChangeId(change));
-    }
-
-    if (hasConflicts(tx, set)) {
-      return false;
-    }
-    // guard against changes to the transaction log while processing
-    this.logReadLock.lock();
-    try {
-      synchronized (this) {
-        ensureAvailable();
-        addCommittingChangeSet(tx.getTransactionId(), set);
-      }
-      appendToLog(TransactionEdit.createCommitting(tx.getTransactionId(), set));
-    } finally {
-      this.logReadLock.unlock();
-    }
-    txMetricsCollector.histogram("canCommit.latency", (int) timer.elapsedMillis());
-    return true;
-  }
-
-  private void addCommittingChangeSet(long writePointer, Set<ChangeId> changes) {
-    committingChangeSets.put(writePointer, changes);
-  }
-
-  public boolean commit(Transaction tx) throws TransactionNotInProgressException {
-    txMetricsCollector.rate("commit");
-    Stopwatch timer = new Stopwatch().start();
-    Set<ChangeId> changeSet = null;
-    boolean addToCommitted = true;
-    long commitPointer;
-    // guard against changes to the transaction log while processing
-    this.logReadLock.lock();
-    try {
-      synchronized (this) {
-        ensureAvailable();
-        // we record commits at the first not-yet assigned transaction id to simplify clearing out change sets that
-        // are no longer visible by any in-progress transactions
-        commitPointer = lastWritePointer + 1;
-        if (inProgress.get(tx.getTransactionId()) == null) {
-          // invalid transaction, either this has timed out and moved to invalid, or something else is wrong.
-          if (invalid.contains(tx.getTransactionId())) {
-            throw new TransactionNotInProgressException(
-              String.format("canCommit() is called for transaction %d that is not in progress " +
-                              "(it is known to be invalid)", tx.getTransactionId()));
-          } else {
-            throw new TransactionNotInProgressException(
-              String.format("canCommit() is called for transaction %d that is not in progress", tx.getTransactionId()));
-          }
-        }
-
-        // these should be atomic
-        // NOTE: whether we succeed or not we don't need to keep changes in committing state: same tx cannot
-        //       be attempted to commit twice
-        changeSet = committingChangeSets.remove(tx.getTransactionId());
-
-        if (changeSet != null) {
-          // double-checking if there are conflicts: someone may have committed since canCommit check
-          if (hasConflicts(tx, changeSet)) {
-            return false;
-          }
-        } else {
-          // no changes
-          addToCommitted = false;
-        }
-        doCommit(tx.getTransactionId(), tx.getWritePointer(), changeSet, commitPointer, addToCommitted);
-      }
-      appendToLog(TransactionEdit.createCommitted(tx.getTransactionId(), changeSet, commitPointer, addToCommitted));
-    } finally {
-      this.logReadLock.unlock();
-    }
-    txMetricsCollector.histogram("commit.latency", (int) timer.elapsedMillis());
-    return true;
-  }
-
-  private void doCommit(long transactionId, long writePointer, Set<ChangeId> changes, long commitPointer,
-                        boolean addToCommitted) {
-    // In case this method is called when loading a previous WAL, we need to remove the tx from these sets
-    committingChangeSets.remove(transactionId);
-    if (addToCommitted && !changes.isEmpty()) {
-      // No need to add empty changes to the committed change sets, they will never trigger any conflict
-
-      // Record the committed change set with the next writePointer as the commit time.
-      // NOTE: we use current next writePointer as key for the map, hence we may have multiple txs changesets to be
-      //       stored under one key
-      Set<ChangeId> changeIds = committedChangeSets.get(commitPointer);
-      if (changeIds != null) {
-        // NOTE: we modify the new set to prevent concurrent modification exception, as other threads (e.g. in
-        // canCommit) use it unguarded
-        changes.addAll(changeIds);
-      }
-      committedChangeSets.put(commitPointer, changes);
-    }
-    // remove from in-progress set, so that it does not get excluded in the future
-    InProgressTx previous = inProgress.remove(transactionId);
-    if (previous == null) {
-      // tx was not in progress! perhaps it timed out and is invalid? try to remove it there.
-      if (invalid.rem(transactionId)) {
-        invalidArray = invalid.toLongArray();
-        LOG.info("Tx invalid list: removed committed tx {}", transactionId);
-      }
-    }
-    // moving read pointer
-    moveReadPointerIfNeeded(writePointer);
-
-    // All committed change sets that are smaller than the earliest started transaction can be removed.
-    // here we ignore transactions that have no timeout, they are long-running and don't participate in
-    // conflict detection.
-    // TODO: for efficiency, can we do this once per-log in replayLogs instead of once per edit?
-    committedChangeSets.headMap(TxUtils.getFirstShortInProgress(inProgress)).clear();
-  }
-
-  public void abort(Transaction tx) {
-    // guard against changes to the transaction log while processing
-    txMetricsCollector.rate("abort");
-    Stopwatch timer = new Stopwatch().start();
-    this.logReadLock.lock();
-    try {
-      synchronized (this) {
-        ensureAvailable();
-        doAbort(tx.getTransactionId(), tx.getCheckpointWritePointers(), tx.getType());
-      }
-      appendToLog(TransactionEdit.createAborted(tx.getTransactionId(), tx.getType(), tx.getCheckpointWritePointers()));
-      txMetricsCollector.histogram("abort.latency", (int) timer.elapsedMillis());
-    } finally {
-      this.logReadLock.unlock();
-    }
-  }
-
-  private void doAbort(long writePointer, long[] checkpointWritePointers, TransactionType type) {
-    committingChangeSets.remove(writePointer);
-    
-    if (type == TransactionType.LONG) {
-      // Long running transactions cannot be aborted as their change sets are not saved, 
-      // and hence the changes cannot be rolled back. Invalidate the long running transaction instead.
-      doInvalidate(writePointer);
-      return;
-    }
-    
-    // makes tx visible (assumes that all operations were rolled back)
-    // remove from in-progress set, so that it does not get excluded in the future
-    InProgressTx removed = inProgress.remove(writePointer);
-    if (removed == null) {
-      // tx was not in progress! perhaps it timed out and is invalid? try to remove it there.
-      if (invalid.rem(writePointer)) {
-        // remove any invalidated checkpoint pointers
-        // this will only be present if the parent write pointer was also invalidated
-        if (checkpointWritePointers != null) {
-          for (int i = 0; i < checkpointWritePointers.length; i++) {
-            invalid.rem(checkpointWritePointers[i]);
-          }
-        }
-        invalidArray = invalid.toLongArray();
-        LOG.info("Tx invalid list: removed aborted tx {}", writePointer);
-        // removed a tx from excludes: must move read pointer
-        moveReadPointerIfNeeded(writePointer);
-      }
-    } else {
-      // removed a tx from excludes: must move read pointer
-      moveReadPointerIfNeeded(writePointer);
-    }
-  }
-
-  public boolean invalidate(long tx) {
-    // guard against changes to the transaction log while processing
-    txMetricsCollector.rate("invalidate");
-    Stopwatch timer = new Stopwatch().start();
-    this.logReadLock.lock();
-    try {
-      boolean success;
-      synchronized (this) {
-        ensureAvailable();
-        success = doInvalidate(tx);
-      }
-      appendToLog(TransactionEdit.createInvalid(tx));
-      txMetricsCollector.histogram("invalidate.latency", (int) timer.elapsedMillis());
-      return success;
-    } finally {
-      this.logReadLock.unlock();
-    }
-  }
-
-  private boolean doInvalidate(long writePointer) {
-    Set<ChangeId> previousChangeSet = committingChangeSets.remove(writePointer);
-    // remove from in-progress set, so that it does not get excluded in the future
-    InProgressTx previous = inProgress.remove(writePointer);
-    // This check is to prevent from invalidating committed transactions
-    if (previous != null || previousChangeSet != null) {
-      // add tx to invalids
-      invalid.add(writePointer);
-      if (previous == null) {
-        LOG.debug("Invalidating tx {} in committing change sets but not in-progress", writePointer);
-      } else {
-        // invalidate any checkpoint write pointers
-        LongArrayList childWritePointers = previous.getCheckpointWritePointers();
-        if (childWritePointers != null) {
-          for (int i = 0; i < childWritePointers.size(); i++) {
-            invalid.add(childWritePointers.get(i));
-          }
-        }
-      }
-      LOG.info("Tx invalid list: added tx {} because of invalidate", writePointer);
-      // todo: find a more efficient way to keep this sorted. Could it just be an array?
-      Collections.sort(invalid);
-      invalidArray = invalid.toLongArray();
-      if (previous != null && !previous.isLongRunning()) {
-        // tx was short-running: must move read pointer
-        moveReadPointerIfNeeded(writePointer);
-      }
-      return true;
-    }
-    return false;
-  }
-
-  /**
-   * Removes the given transaction ids from the invalid list.
-   * @param invalidTxIds transaction ids
-   * @return true if invalid list got changed, false otherwise
-   */
-  public boolean truncateInvalidTx(Set<Long> invalidTxIds) {
-    // guard against changes to the transaction log while processing
-    txMetricsCollector.rate("truncateInvalidTx");
-    Stopwatch timer = new Stopwatch().start();
-    this.logReadLock.lock();
-    try {
-      boolean success;
-      synchronized (this) {
-        ensureAvailable();
-        success = doTruncateInvalidTx(invalidTxIds);
-      }
-      appendToLog(TransactionEdit.createTruncateInvalidTx(invalidTxIds));
-      txMetricsCollector.histogram("truncateInvalidTx.latency", (int) timer.elapsedMillis());
-      return success;
-    } finally {
-      this.logReadLock.unlock();
-    }
-  }
-
-  private boolean doTruncateInvalidTx(Set<Long> invalidTxIds) {
-    LOG.info("Removing tx ids {} from invalid list", invalidTxIds);
-    boolean success = invalid.removeAll(invalidTxIds);
-    if (success) {
-      invalidArray = invalid.toLongArray();
-    }
-    return success;
-  }
-
-  /**
-   * Removes all transaction ids started before the given time from invalid list.
-   * @param time time in milliseconds
-   * @return true if invalid list got changed, false otherwise
-   * @throws InvalidTruncateTimeException if there are any in-progress transactions started before given time
-   */
-  public boolean truncateInvalidTxBefore(long time) throws InvalidTruncateTimeException {
-    // guard against changes to the transaction log while processing
-    txMetricsCollector.rate("truncateInvalidTxBefore");
-    Stopwatch timer = new Stopwatch().start();
-    this.logReadLock.lock();
-    try {
-      boolean success;
-      synchronized (this) {
-        ensureAvailable();
-        success = doTruncateInvalidTxBefore(time);
-      }
-      appendToLog(TransactionEdit.createTruncateInvalidTxBefore(time));
-      txMetricsCollector.histogram("truncateInvalidTxBefore.latency", (int) timer.elapsedMillis());
-      return success;
-    } finally {
-      this.logReadLock.unlock();
-    }
-  }
-  
-  private boolean doTruncateInvalidTxBefore(long time) throws InvalidTruncateTimeException {
-    LOG.info("Removing tx ids before {} from invalid list", time);
-    long truncateWp = time * TxConstants.MAX_TX_PER_MS;
-    // Check if there any in-progress transactions started earlier than truncate time
-    if (inProgress.lowerKey(truncateWp) != null) {
-      throw new InvalidTruncateTimeException("Transactions started earlier than " + time + " are in-progress");
-    }
-    
-    // Find all invalid transactions earlier than truncateWp
-    Set<Long> toTruncate = Sets.newHashSet();
-    for (long wp : invalid) {
-      // invalid list is sorted, hence can stop as soon as we reach a wp >= truncateWp
-      if (wp >= truncateWp) {
-        break;
-      }
-      toTruncate.add(wp);
-    }
-    return doTruncateInvalidTx(toTruncate);
-  }
-
-  public Transaction checkpoint(Transaction originalTx) throws TransactionNotInProgressException {
-    txMetricsCollector.rate("checkpoint");
-    Stopwatch timer = new Stopwatch().start();
-
-    Transaction checkpointedTx = null;
-    long txId = originalTx.getTransactionId();
-    long newWritePointer = 0;
-    // guard against changes to the transaction log while processing
-    this.logReadLock.lock();
-    try {
-      synchronized (this) {
-        ensureAvailable();
-        // check that the parent tx is in progress
-        InProgressTx parentTx = inProgress.get(txId);
-        if (parentTx == null) {
-          if (invalid.contains(txId)) {
-            throw new TransactionNotInProgressException(
-                String.format("Transaction %d is not in progress because it was invalidated", txId));
-          } else {
-            throw new TransactionNotInProgressException(
-                String.format("Transaction %d is not in progress", txId));
-          }
-        }
-        newWritePointer = getNextWritePointer();
-        doCheckpoint(newWritePointer, txId);
-        // create a new transaction with the same read snapshot, plus the additional checkpoint write pointer
-        // the same read snapshot is maintained to
-        checkpointedTx = new Transaction(originalTx, newWritePointer,
-            parentTx.getCheckpointWritePointers().toLongArray());
-      }
-      // appending to WAL out of global lock for concurrent performance
-      // we should still be able to arrive at the same state even if log entries are out of order
-      appendToLog(TransactionEdit.createCheckpoint(newWritePointer, txId));
-    } finally {
-      this.logReadLock.unlock();
-    }
-    txMetricsCollector.histogram("checkpoint.latency", (int) timer.elapsedMillis());
-
-    return checkpointedTx;
-  }
-
-  private void doCheckpoint(long newWritePointer, long parentWritePointer) {
-    InProgressTx existingTx = inProgress.get(parentWritePointer);
-    existingTx.addCheckpointWritePointer(newWritePointer);
-    advanceWritePointer(newWritePointer);
-  }
-  
-  // hack for exposing important metric
-  public int getExcludedListSize() {
-    return invalid.size() + inProgress.size();
-  }
-
-  /**
-   * @return the size of invalid list
-   */
-  public int getInvalidSize() {
-    return this.invalid.size();
-  }
-
-  int getCommittedSize() {
-    return this.committedChangeSets.size();
-  }
-
-  private boolean hasConflicts(Transaction tx, Set<ChangeId> changeIds) {
-    if (changeIds.isEmpty()) {
-      return false;
-    }
-
-    for (Map.Entry<Long, Set<ChangeId>> changeSet : committedChangeSets.entrySet()) {
-      // If commit time is greater than tx read-pointer,
-      // basically not visible but committed means "tx committed after given tx was started"
-      if (changeSet.getKey() > tx.getTransactionId()) {
-        if (overlap(changeSet.getValue(), changeIds)) {
-          return true;
-        }
-      }
-    }
-    return false;
-  }
-
-  private boolean overlap(Set<ChangeId> a, Set<ChangeId> b) {
-    // iterate over the smaller set, and check for every element in the other set
-    if (a.size() > b.size()) {
-      for (ChangeId change : b) {
-        if (a.contains(change)) {
-          return true;
-        }
-      }
-    } else {
-      for (ChangeId change : a) {
-        if (b.contains(change)) {
-          return true;
-        }
-      }
-    }
-    return false;
-  }
-
-  private void moveReadPointerIfNeeded(long committedWritePointer) {
-    if (committedWritePointer > readPointer) {
-      readPointer = committedWritePointer;
-    }
-  }
-
-  /**
-   * Creates a new Transaction. This method only get called from start transaction, which is already
-   * synchronized.
-   */
-  private Transaction createTransaction(long writePointer, TransactionType type) {
-    // For holding the first in progress short transaction Id (with timeout >= 0).
-    long firstShortTx = Transaction.NO_TX_IN_PROGRESS;
-    LongArrayList inProgressIds = new LongArrayList(inProgress.size());
-    for (Map.Entry<Long, InProgressTx> entry : inProgress.entrySet()) {
-      long txId = entry.getKey();
-      inProgressIds.add(txId);
-      // add any checkpointed write pointers to the in-progress list
-      LongArrayList childIds = entry.getValue().getCheckpointWritePointers();
-      if (childIds != null) {
-        for (int i = 0; i < childIds.size(); i++) {
-          inProgressIds.add(childIds.get(i));
-        }
-      }
-      if (firstShortTx == Transaction.NO_TX_IN_PROGRESS && !entry.getValue().isLongRunning()) {
-        firstShortTx = txId;
-      }
-    }
-
-    return new Transaction(readPointer, writePointer, invalidArray, inProgressIds.toLongArray(), firstShortTx, type);
-  }
-
-  private void appendToLog(TransactionEdit edit) {
-    try {
-      Stopwatch timer = new Stopwatch().start();
-      currentLog.append(edit);
-      txMetricsCollector.rate("wal.append.count");
-      txMetricsCollector.histogram("wal.append.latency", (int) timer.elapsedMillis());
-    } catch (IOException ioe) {
-      abortService("Error appending to transaction log", ioe);
-    }
-  }
-
-  private void appendToLog(List<TransactionEdit> edits) {
-    try {
-      Stopwatch timer = new Stopwatch().start();
-      currentLog.append(edits);
-      txMetricsCollector.rate("wal.append.count", edits.size());
-      txMetricsCollector.histogram("wal.append.latency", (int) timer.elapsedMillis());
-    } catch (IOException ioe) {
-      abortService("Error appending to transaction log", ioe);
-    }
-  }
-
-  /**
-   * Called from the tx service every 10 seconds.
-   * This hack is needed because current metrics system is not flexible when it comes to adding new metrics.
-   */
-  public void logStatistics() {
-    LOG.info("Transaction Statistics: write pointer = " + lastWritePointer +
-               ", invalid = " + invalid.size() +
-               ", in progress = " + inProgress.size() +
-               ", committing = " + committingChangeSets.size() +
-               ", committed = " + committedChangeSets.size());
-  }
-
-  private abstract static class DaemonThreadExecutor extends Thread {
-    private AtomicBoolean stopped = new AtomicBoolean(false);
-
-    public DaemonThreadExecutor(String name) {
-      super(name);
-      setDaemon(true);
-    }
-
-    public void run() {
-      try {
-        while (!isInterrupted() && !stopped.get()) {
-          doRun();
-          synchronized (stopped) {
-            stopped.wait(getSleepMillis());
-          }
-        }
-      } catch (InterruptedException ie) {
-        LOG.info("Interrupted thread " + getName());
-      }
-      // perform any final cleanup
-      onShutdown();
-      LOG.info("Exiting thread " + getName());
-    }
-
-    public abstract void doRun();
-
-    protected abstract long getSleepMillis();
-
-    protected void onShutdown() {
-    }
-
-    public void shutdown() {
-      if (stopped.compareAndSet(false, true)) {
-        synchronized (stopped) {
-          stopped.notifyAll();
-        }
-      }
-    }
-  }
-
-  /**
-   * Represents some of the info on in-progress tx
-   */
-  public static final class InProgressTx {
-    /** the oldest in progress tx at the time of this tx start */
-    private final long visibilityUpperBound;
-    private final long expiration;
-    private final TransactionType type;
-    private LongArrayList checkpointWritePointers = new LongArrayList();
-
-    public InProgressTx(long visibilityUpperBound, long expiration, TransactionType type) {
-      this(visibilityUpperBound, expiration, type, new LongArrayList());
-    }
-
-    public InProgressTx(long visibilityUpperBound, long expiration, TransactionType type,
-                        LongArrayList checkpointWritePointers) {
-      this.visibilityUpperBound = visibilityUpperBound;
-      this.expiration = expiration;
-      this.type = type;
-      this.checkpointWritePointers = checkpointWritePointers;
-    }
-
-    // For backwards compatibility when long running txns were represented with -1 expiration
-    @Deprecated
-    public InProgressTx(long visibilityUpperBound, long expiration) {
-      this(visibilityUpperBound, expiration, null);
-    }
-
-    public long getVisibilityUpperBound() {
-      return visibilityUpperBound;
-    }
-
-    public long getExpiration() {
-      return expiration;
-    }
-
-    @Nullable
-    public TransactionType getType() {
-      return type;
-    }
-
-    public boolean isLongRunning() {
-      if (type == null) {
-        // for backwards compatibility when long running txns were represented with -1 expiration
-        return expiration == -1;
-      }
-      return type == TransactionType.LONG;
-    }
-
-    public void addCheckpointWritePointer(long checkpointWritePointer) {
-      checkpointWritePointers.add(checkpointWritePointer);
-    }
-
-    public LongArrayList getCheckpointWritePointers() {
-      return checkpointWritePointers;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (o == null || !(o instanceof InProgressTx)) {
-        return false;
-      }
-
-      if (this == o) {
-        return true;
-      }
-
-      InProgressTx other = (InProgressTx) o;
-      return Objects.equal(visibilityUpperBound, other.getVisibilityUpperBound()) &&
-          Objects.equal(expiration, other.getExpiration()) &&
-          Objects.equal(type, other.type) &&
-          Objects.equal(checkpointWritePointers, other.checkpointWritePointers);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hashCode(visibilityUpperBound, expiration, type, checkpointWritePointers);
-    }
-
-    @Override
-    public String toString() {
-      return Objects.toStringHelper(this)
-          .add("visibilityUpperBound", visibilityUpperBound)
-          .add("expiration", expiration)
-          .add("type", type)
-          .add("checkpointWritePointers", checkpointWritePointers)
-          .toString();
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/TransactionServiceMain.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/TransactionServiceMain.java b/tephra-core/src/main/java/co/cask/tephra/TransactionServiceMain.java
deleted file mode 100644
index 8437017..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/TransactionServiceMain.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra;
-
-import co.cask.tephra.distributed.TransactionService;
-import co.cask.tephra.runtime.ConfigModule;
-import co.cask.tephra.runtime.DiscoveryModules;
-import co.cask.tephra.runtime.TransactionClientModule;
-import co.cask.tephra.runtime.TransactionModules;
-import co.cask.tephra.runtime.ZKModule;
-import co.cask.tephra.util.ConfigurationFactory;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.twill.zookeeper.ZKClientService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.CountDownLatch;
-
-/**
- * Driver class to start and stop tx in distributed mode.
- */
-public class TransactionServiceMain {
-
-  private static final Logger LOG = LoggerFactory.getLogger(TransactionServiceMain.class);
-
-  private Configuration conf;
-  private TransactionService txService;
-
-  public static void main(String args[]) throws Exception {
-    TransactionServiceMain instance = new TransactionServiceMain();
-    instance.doMain(args);
-  }
-
-  public TransactionServiceMain() {
-    this(null);
-  }
-
-  public TransactionServiceMain(Configuration conf) {
-    this.conf = conf;
-  }
-
-  /**
-   * The main method. It simply call methods in the same sequence
-   * as if the program is started by jsvc.
-   */
-  public void doMain(final String[] args) throws Exception {
-    final CountDownLatch shutdownLatch = new CountDownLatch(1);
-    Runtime.getRuntime().addShutdownHook(new Thread() {
-      @Override
-      public void run() {
-      try {
-        try {
-          TransactionServiceMain.this.stop();
-        } finally {
-          try {
-            TransactionServiceMain.this.destroy();
-          } finally {
-            shutdownLatch.countDown();
-          }
-        }
-      } catch (Throwable t) {
-        LOG.error("Exception when shutting down: " + t.getMessage(), t);
-      }
-      }
-    });
-    init(args);
-    start();
-
-    shutdownLatch.await();
-  }
-
-  /**
-   * Invoked by jsvc to initialize the program.
-   */
-  public void init(String[] args) {
-    if (conf == null) {
-      conf = new ConfigurationFactory().get();
-    }
-  }
-
-  /**
-   * Invoked by jsvc to start the program.
-   */
-  public void start() throws Exception {
-    Injector injector = Guice.createInjector(
-      new ConfigModule(conf),
-      new ZKModule(),
-      new DiscoveryModules().getDistributedModules(),
-      new TransactionModules().getDistributedModules(),
-      new TransactionClientModule()
-    );
-
-    ZKClientService zkClientService = injector.getInstance(ZKClientService.class);
-    zkClientService.startAndWait();
-
-    // start a tx server
-    txService = injector.getInstance(TransactionService.class);
-    try {
-      LOG.info("Starting {}", getClass().getSimpleName());
-      txService.startAndWait();
-    } catch (Exception e) {
-      System.err.println("Failed to start service: " + e.getMessage());
-    }
-  }
-
-  /**
-   * Invoked by jsvc to stop the program.
-   */
-  public void stop() {
-    LOG.info("Stopping {}", getClass().getSimpleName());
-    if (txService == null) {
-      return;
-    }
-    try {
-      if (txService.isRunning()) {
-        txService.stopAndWait();
-      }
-    } catch (Throwable e) {
-      LOG.error("Failed to shutdown transaction service.", e);
-      // because shutdown hooks execute concurrently, the logger may be closed already: thus also print it.
-      System.err.println("Failed to shutdown transaction service: " + e.getMessage());
-      e.printStackTrace(System.err);
-    }
-  }
-
-  /**
-   * Invoked by jsvc for resource cleanup.
-   */
-  public void destroy() { }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/TransactionSystemClient.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/TransactionSystemClient.java b/tephra-core/src/main/java/co/cask/tephra/TransactionSystemClient.java
deleted file mode 100644
index ca68143..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/TransactionSystemClient.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra;
-
-import java.io.InputStream;
-import java.util.Collection;
-import java.util.Set;
-
-/**
- * Client talking to transaction system.
- * See also {@link co.cask.tephra.TransactionAware}.
- * todo: explain Omid.
- */
-public interface TransactionSystemClient {
-  /**
-   * Starts new short transaction.
-   * @return instance of {@link co.cask.tephra.Transaction}
-   */
-  // TODO: "short" and "long" are very misleading names. Use transaction attributes like "detect conflicts or not", etc.
-  Transaction startShort();
-
-  /**
-   * Starts new short transaction.
-   * @param timeout the timeout for the transaction
-   * @return instance of {@link Transaction}
-   */
-  Transaction startShort(int timeout);
-
-  /**
-   * Starts new long transaction.
-   * @return instance of {@link Transaction}
-   */
-  Transaction startLong();
-
-  // this pre-commit detects conflicts with other transactions committed so far
-  // NOTE: the changes set should not change after this operation, this may help us do some extra optimizations
-  // NOTE: there should be time constraint on how long does it take to commit changes by the client after this operation
-  //       is submitted so that we can cleanup related resources
-  // NOTE: as of now you can call this method multiple times, each time the changeSet of tx will be updated. Not sure
-  //       if we can call it a feature or a side-affect of implementation. It makes more sense to append changeset, but
-  //       before we really need it we don't do it because it will slow down tx manager throughput.
-
-  /**
-   * Checks if transaction with the set of changes can be committed. E.g. it can check conflicts with other changes and
-   * refuse commit if there are conflicts. It is assumed that no other changes will be done in between this method call
-   * and {@link #commit(Transaction)} which may check conflicts again to avoid races.
-   * <p/>
-   * Since we do conflict detection at commit time as well, this may seem redundant. The idea is to check for conflicts
-   * before we persist changes to avoid rollback in case of conflicts as much as possible.
-   * NOTE: in some situations we may want to skip this step to save on RPC with a risk of many rollback ops. So by
-   *       default we take safe path.
-   *
-   * @param tx transaction to verify
-   * @param changeIds ids of changes made by transaction
-   * @return true if transaction can be committed otherwise false
-   */
-  boolean canCommit(Transaction tx, Collection<byte[]> changeIds) throws TransactionNotInProgressException;
-
-  /**
-   * Makes transaction visible. It will again check conflicts of changes submitted previously with
-   * {@link #canCommit(Transaction, java.util.Collection)}
-   * @param tx transaction to make visible.
-   * @return true if transaction can be committed otherwise false
-   */
-  boolean commit(Transaction tx) throws TransactionNotInProgressException;
-
-  /**
-   * Makes transaction visible. You should call it only when all changes of this tx are undone.
-   * NOTE: it will not throw {@link TransactionNotInProgressException} if transaction has timed out.
-   * @param tx transaction to make visible.
-   */
-  void abort(Transaction tx);
-
-  /**
-   * Makes transaction invalid. You should call it if not all changes of this tx could be undone.
-   * NOTE: it will not throw {@link TransactionNotInProgressException} if transaction has timed out.
-   * @param tx transaction id to invalidate.
-   * @return true if transaction has been successfully invalidated
-   */
-  boolean invalidate(long tx);
-
-  /**
-   * Performs a checkpoint operation on the current transaction, returning a new Transaction instance with the
-   * updated state.  A checkpoint operation assigns a new write pointer for the current transaction.
-   * @param tx the current transaction to checkpoint
-   * @return an updated transaction instance with the new write pointer
-   */
-  Transaction checkpoint(Transaction tx) throws TransactionNotInProgressException;
-
-  /**
-   * Retrieves the state of the transaction manager and send it as a stream. The snapshot will not be persisted.
-   * @return an input stream containing an encoded snapshot of the transaction manager
-   */
-  InputStream getSnapshotInputStream() throws TransactionCouldNotTakeSnapshotException;
-
-  /**
-   * Return the status of the transaction Manager
-   * @return a String which denotes the status of txManager
-   */
-  String status();
-
-  /**
-   * Resets the state of the transaction manager.
-   */
-  void resetState();
-
-  /**
-   * Removes the given transaction ids from the invalid list. 
-   * @param invalidTxIds transaction ids
-   * @return true if invalid list got changed, false otherwise
-   */
-  boolean truncateInvalidTx(Set<Long> invalidTxIds);
-
-  /**
-   * Removes all transaction ids started before the given time from invalid list.
-   * @param time time in milliseconds
-   * @return true if invalid list got changed, false otherwise
-   * @throws InvalidTruncateTimeException if there are any in-progress transactions started before given time
-   */
-  boolean truncateInvalidTxBefore(long time) throws InvalidTruncateTimeException;
-
-  /**
-   * @return the size of invalid list
-   */
-  int getInvalidSize();
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/TxConstants.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/TxConstants.java b/tephra-core/src/main/java/co/cask/tephra/TxConstants.java
deleted file mode 100644
index 7c96c85..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/TxConstants.java
+++ /dev/null
@@ -1,343 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra;
-
-import co.cask.tephra.snapshot.DefaultSnapshotCodec;
-import co.cask.tephra.snapshot.SnapshotCodecV2;
-import co.cask.tephra.snapshot.SnapshotCodecV3;
-import co.cask.tephra.snapshot.SnapshotCodecV4;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * Transaction system constants
- */
-public class TxConstants {
-  /**
-   * Defines what level of conflict detection should be used for transactions.  {@code ROW} means that only the
-   * table name and row key for each change will be used to determine if the transaction change sets conflict.
-   * {@code COLUMN} means that the table name, row key, column family, and column qualifier will all be used to
-   * identify write conflicts.  {@code NONE} means that no conflict detection will be performed, but transaction
-   * clients will still track the current transaction's change set to rollback any persisted changes in the event of
-   * a failure.  This should only be used where writes to the same coordinate should never conflict, such as
-   * append-only data.  The default value used by {@code TransactionAwareHTable} implementations is {@code COLUMN}.
-   *
-   * <p>
-   * <strong>Note: for a given table, all clients must use the same conflict detection setting!</strong>
-   * Otherwise conflicts will not be flagged correctly.
-   * </p>
-   */
-  public enum ConflictDetection {
-    ROW,
-    COLUMN,
-    NONE
-  }
-
-  /**
-   * Property set for {@code org.apache.hadoop.hbase.HColumnDescriptor} to configure time-to-live on data within
-   * the column family.  The value given is in milliseconds.  Once a cell's data has surpassed the given value in age,
-   * the cell's data will no longer be visible and may be garbage collected.
-   */
-  public static final String PROPERTY_TTL = "dataset.table.ttl";
-
-  /**
-   * This is how many tx we allow per millisecond, if you care about the system for 100 years:
-   * Long.MAX_VALUE / (System.currentTimeMillis() + TimeUnit.DAYS.toMillis(365 * 100)) =
-   * (as of Feb 20, 2014) 2,028,653. It is safe and convenient to use 1,000,000 as multiplier:
-   * <ul>
-   *   <li>
-   *     we hardly can do more than 1 billion txs per second
-   *   </li>
-   *   <li>
-   *     long value will not overflow for 200 years
-   *   </li>
-   *   <li>
-   *     makes reading & debugging easier if multiplier is 10^n
-   *   </li>
-   * </ul>
-   */
-  public static final long MAX_TX_PER_MS = 1000000;
-
-  /**
-   * Since HBase {@code Delete} operations cannot be undone at the same timestamp, "deleted" data is instead
-   * overwritten with an empty {@code byte[]} to flag it as removed.  Cells with empty values will be filtered out
-   * of the results for read operations.  If cells with empty values should be included in results (meaning data
-   * cannot be transactionally deleted), then set this configuration property to true.
-   */
-  public static final String ALLOW_EMPTY_VALUES_KEY = "data.tx.allow.empty.values";
-  public static final boolean ALLOW_EMPTY_VALUES_DEFAULT = false;
-
-  /**
-   * Key used to set the serialized transaction as an attribute on Get and Scan operations.
-   */
-  public static final String TX_OPERATION_ATTRIBUTE_KEY = "cask.tx";
-  /**
-   * Key used to flag a delete operation as part of a transaction rollback.  This is used so that the
-   * {@code TransactionProcessor} coprocessor loaded on a table can differentiate between deletes issued
-   * as part of a normal client operation versus those performed when rolling back a transaction.
-   */
-  public static final String TX_ROLLBACK_ATTRIBUTE_KEY = "cask.tx.rollback";
-
-  /**
-   * Column qualifier used for a special delete marker tombstone, which identifies an entire column family as deleted.
-   */
-  public static final byte[] FAMILY_DELETE_QUALIFIER = new byte[0];
-
-  // Constants for monitoring status
-  public static final String STATUS_OK = "OK";
-  public static final String STATUS_NOTOK = "NOTOK";
-
-  /**
-   * Indicates whether data written before Tephra was enabled on a table should be readable. Reading non-transactional
-   * data can lead to slight performance penalty. Hence it is disabled by default.
-   * @see <a href="https://issues.cask.co/browse/TEPHRA-89">TEPHRA-89</a>
-   */
-  public static final String READ_NON_TX_DATA = "data.tx.read.pre.existing";
-  public static final boolean DEFAULT_READ_NON_TX_DATA = false;
-
-  /**
-   * TransactionManager configuration.
-   */
-  public static final class Manager {
-    // TransactionManager configuration
-    public static final String CFG_DO_PERSIST = "tx.persist";
-    /** Directory in HDFS used for transaction snapshot and log storage. */
-    public static final String CFG_TX_SNAPSHOT_DIR = "data.tx.snapshot.dir";
-    /** Directory on the local filesystem used for transaction snapshot and log storage. */
-    public static final String CFG_TX_SNAPSHOT_LOCAL_DIR = "data.tx.snapshot.local.dir";
-    /** How often to clean up timed out transactions, in seconds, or 0 for no cleanup. */
-    public static final String CFG_TX_CLEANUP_INTERVAL = "data.tx.cleanup.interval";
-    /** The user id to access HDFS if not running in secure HDFS. */
-    public static final String CFG_TX_HDFS_USER = "data.tx.hdfs.user";
-    /** Default value for how often to check in-progress transactions for expiration, in seconds. */
-    public static final int DEFAULT_TX_CLEANUP_INTERVAL = 10;
-    /**
-     * The timeout for a transaction, in seconds. If the transaction is not finished in that time,
-     * it is marked invalid.
-     */
-    public static final String CFG_TX_TIMEOUT = "data.tx.timeout";
-    /** Default value for transaction timeout, in seconds. */
-    public static final int DEFAULT_TX_TIMEOUT = 30;
-    /**
-     * The timeout for a long running transaction, in seconds. If the transaction is not finished in that time,
-     * it is marked invalid.
-     */
-    public static final String CFG_TX_LONG_TIMEOUT = "data.tx.long.timeout";
-    /** Default value for long running transaction timeout, in seconds. */
-    public static final int DEFAULT_TX_LONG_TIMEOUT = (int) TimeUnit.DAYS.toSeconds(1);
-    /** The frequency (in seconds) to perform periodic snapshots, or 0 for no periodic snapshots. */
-    public static final String CFG_TX_SNAPSHOT_INTERVAL = "data.tx.snapshot.interval";
-    /** Default value for frequency of periodic snapshots of transaction state. */
-    public static final long DEFAULT_TX_SNAPSHOT_INTERVAL = 300;
-    /** Number of most recent transaction snapshots to retain. */
-    public static final String CFG_TX_SNAPSHOT_RETAIN = "data.tx.snapshot.retain";
-    /** Default value for number of most recent snapshots to retain. */
-    public static final int DEFAULT_TX_SNAPSHOT_RETAIN = 10;
-  }
-
-  /**
-   * TransactionService configuration.
-   */
-  public static final class Service {
-
-    /** for the zookeeper quorum string for leader election for tx server. */
-    public static final String CFG_DATA_TX_ZOOKEEPER_QUORUM
-      = "data.tx.zookeeper.quorum";
-
-    /** for the name used to announce service availability to discovery service */
-    public static final String CFG_DATA_TX_DISCOVERY_SERVICE_NAME
-      = "data.tx.discovery.service.name";
-
-    /** for the port of the tx server. */
-    public static final String CFG_DATA_TX_BIND_PORT
-      = "data.tx.bind.port";
-
-    /** for the address (hostname) of the tx server. */
-    public static final String CFG_DATA_TX_BIND_ADDRESS
-      = "data.tx.bind.address";
-
-    /** the number of IO threads in the tx service. */
-    public static final String CFG_DATA_TX_SERVER_IO_THREADS
-      = "data.tx.server.io.threads";
-
-    /** the number of handler threads in the tx service. */
-    public static final String CFG_DATA_TX_SERVER_THREADS
-      = "data.tx.server.threads";
-
-    public static final String CFG_DATA_TX_THRIFT_MAX_READ_BUFFER
-      = "data.tx.thrift.max.read.buffer";
-
-    public static final String DEFAULT_DATA_TX_DISCOVERY_SERVICE_NAME
-      = "transaction";
-
-    /** default tx service port. */
-    public static final int DEFAULT_DATA_TX_BIND_PORT
-      = 15165;
-
-    /** default tx service address. */
-    public static final String DEFAULT_DATA_TX_BIND_ADDRESS
-      = "0.0.0.0";
-
-    /** default number of handler IO threads in tx service. */
-    public static final int DEFAULT_DATA_TX_SERVER_IO_THREADS
-      = 2;
-
-    /** default number of handler threads in tx service. */
-    public static final int DEFAULT_DATA_TX_SERVER_THREADS
-      = 20;
-
-    /** default thrift max read buffer size */
-    public static final int DEFAULT_DATA_TX_THRIFT_MAX_READ_BUFFER
-      = 16 * 1024 * 1024;
-
-    // Configuration key names and defaults used by tx client.
-
-    /** to specify the tx client socket timeout in ms. */
-    public static final String CFG_DATA_TX_CLIENT_TIMEOUT
-      = "data.tx.client.timeout";
-
-    /** to specify the tx client provider strategy. */
-    public static final String CFG_DATA_TX_CLIENT_PROVIDER
-      = "data.tx.client.provider";
-
-    /** to specify the number of threads for client provider "pool". */
-    public static final String CFG_DATA_TX_CLIENT_COUNT
-      = "data.tx.client.count";
-
-    /** timeout (in milliseconds) for obtaining client from client provider "pool". */
-    public static final String CFG_DATA_TX_CLIENT_OBTAIN_TIMEOUT_MS
-      = "data.tx.client.obtain.timeout";
-
-    /** to specify the retry strategy for a failed thrift call. */
-    public static final String CFG_DATA_TX_CLIENT_RETRY_STRATEGY
-      = "data.tx.client.retry.strategy";
-
-    /** to specify the number of times to retry a failed thrift call. */
-    public static final String CFG_DATA_TX_CLIENT_ATTEMPTS
-      = "data.tx.client.retry.attempts";
-
-    /** to specify the initial sleep time for retry strategy backoff. */
-    public static final String CFG_DATA_TX_CLIENT_BACKOFF_INITIAL
-      = "data.tx.client.retry.backoff.initial";
-
-    /** to specify the backoff factor for retry strategy backoff. */
-    public static final String CFG_DATA_TX_CLIENT_BACKOFF_FACTOR
-      = "data.tx.client.retry.backoff.factor";
-
-    /** to specify the sleep time limit for retry strategy backoff. */
-    public static final String CFG_DATA_TX_CLIENT_BACKOFF_LIMIT
-      = "data.tx.client.retry.backoff.limit";
-
-    /** the default tx client socket timeout in milli seconds. */
-    public static final int DEFAULT_DATA_TX_CLIENT_TIMEOUT_MS
-      = 30 * 1000;
-
-    /** default number of tx clients for client provider "pool". */
-    public static final int DEFAULT_DATA_TX_CLIENT_COUNT
-      = 50;
-
-    /** default timeout (in milliseconds) for obtaining client from client provider "pool". */
-    public static final long DEFAULT_DATA_TX_CLIENT_OBTAIN_TIMEOUT_MS
-      = TimeUnit.SECONDS.toMillis(3);
-
-    /** default tx client provider strategy. */
-    public static final String DEFAULT_DATA_TX_CLIENT_PROVIDER
-      = "pool";
-
-    /** retry strategy for thrift clients, e.g. backoff, or n-times. */
-    public static final String DEFAULT_DATA_TX_CLIENT_RETRY_STRATEGY
-      = "backoff";
-
-    /** default number of attempts for strategy n-times. */
-    public static final int DEFAULT_DATA_TX_CLIENT_ATTEMPTS
-      = 2;
-
-    /** default initial sleep is 100ms. */
-    public static final int DEFAULT_DATA_TX_CLIENT_BACKOFF_INITIAL
-      = 100;
-
-    /** default backoff factor is 4. */
-    public static final int DEFAULT_DATA_TX_CLIENT_BACKOFF_FACTOR
-      = 4;
-
-    /** default sleep limit is 30 sec. */
-    public static final int DEFAULT_DATA_TX_CLIENT_BACKOFF_LIMIT
-      = 30 * 1000;
-  }
-
-  /**
-   * Configuration properties for metrics reporting
-   */
-  public static final class Metrics {
-    /**
-     * Frequency at which metrics should be reported, in seconds.
-     */
-    public static final String REPORT_PERIOD_KEY = "data.tx.metrics.period";
-    /**
-     * Default report period for metrics, in seconds.
-     */
-    public static final int REPORT_PERIOD_DEFAULT = 60;
-  }
-
-  /**
-   * Configuration properties used by HBase
-   */
-  public static final class HBase {
-    public static final String ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum";
-    public static final String ZK_SESSION_TIMEOUT = "zookeeper.session.timeout";
-    public static final int DEFAULT_ZK_SESSION_TIMEOUT = 180 * 1000;
-  }
-
-  /**
-   * Configuration for the TransactionDataJanitor coprocessor.
-   */
-  public static final class DataJanitor {
-    /**
-     * Whether or not the TransactionDataJanitor coprocessor should be enabled on tables.
-     * Disable for testing.
-     */
-    public static final String CFG_TX_JANITOR_ENABLE = "data.tx.janitor.enable";
-    public static final boolean DEFAULT_TX_JANITOR_ENABLE = true;
-  }
-
-  /**
-   * Configuration for the transaction snapshot persistence.
-   */
-  public static final class Persist {
-    /**
-     * The class names of all known transaction snapshot codecs.
-     */
-    public static final String CFG_TX_SNAPHOT_CODEC_CLASSES = "data.tx.snapshot.codecs";
-    public static final Class[] DEFAULT_TX_SNAPHOT_CODEC_CLASSES = 
-      { DefaultSnapshotCodec.class, SnapshotCodecV2.class, SnapshotCodecV3.class, SnapshotCodecV4.class };
-  }
-
-  /**
-   * Configuration for transaction log edit entries
-   */
-  public static final class TransactionLog {
-    /**
-     * Key used to denote the number of entries appended.
-     */
-    public static final String NUM_ENTRIES_APPENDED = "count";
-    public static final String VERSION_KEY = "version";
-    public static final byte CURRENT_VERSION = 2;
-  }
-
-}