You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by po...@apache.org on 2016/05/06 23:02:47 UTC

[41/51] [partial] incubator-tephra git commit: Rename package to org.apache.tephra

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/util/HBaseVersionSpecificFactory.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/util/HBaseVersionSpecificFactory.java b/tephra-core/src/main/java/co/cask/tephra/util/HBaseVersionSpecificFactory.java
deleted file mode 100644
index 2bc51b5..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/util/HBaseVersionSpecificFactory.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.util;
-
-import com.google.inject.Provider;
-import com.google.inject.ProvisionException;
-import org.apache.twill.internal.utils.Instances;
-
-/**
- * Common class factory behavior for classes which need specific implementations depending on HBase versions.
- * Specific factories can subclass this class and simply plug in the class names for their implementations.
- *
- * @param <T> Version specific class provided by this factory.
- */
-public abstract class HBaseVersionSpecificFactory<T> implements Provider<T> {
-  @Override
-  public T get() {
-    T instance = null;
-    try {
-      switch (HBaseVersion.get()) {
-        case HBASE_94:
-          throw new ProvisionException("HBase 0.94 is no longer supported.  Please upgrade to HBase 0.96 or newer.");
-        case HBASE_96:
-          instance = createInstance(getHBase96Classname());
-          break;
-        case HBASE_98:
-          instance = createInstance(getHBase98Classname());
-          break;
-        case HBASE_10:
-          instance = createInstance(getHBase10Classname());
-          break;
-        case HBASE_10_CDH:
-          instance = createInstance(getHBase10CDHClassname());
-          break;
-        case HBASE_11:
-        case HBASE_12_CDH:
-          instance = createInstance(getHBase11Classname());
-          break;
-        case UNKNOWN:
-          throw new ProvisionException("Unknown HBase version: " + HBaseVersion.getVersionString());
-      }
-    } catch (ClassNotFoundException cnfe) {
-      throw new ProvisionException(cnfe.getMessage(), cnfe);
-    }
-    return instance;
-  }
-
-  protected T createInstance(String className) throws ClassNotFoundException {
-    Class clz = Class.forName(className);
-    return (T) Instances.newInstance(clz);
-  }
-
-  protected abstract String getHBase96Classname();
-  protected abstract String getHBase98Classname();
-  protected abstract String getHBase10Classname();
-  protected abstract String getHBase10CDHClassname();
-  protected abstract String getHBase11Classname();
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/util/TxUtils.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/util/TxUtils.java b/tephra-core/src/main/java/co/cask/tephra/util/TxUtils.java
deleted file mode 100644
index cdc9fef..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/util/TxUtils.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.util;
-
-import co.cask.tephra.Transaction;
-import co.cask.tephra.TransactionManager;
-import co.cask.tephra.TransactionType;
-import co.cask.tephra.TxConstants;
-import co.cask.tephra.persist.TransactionVisibilityState;
-import com.google.common.primitives.Longs;
-
-import java.util.Map;
-
-/**
- * Utility methods supporting transaction operations.
- */
-public class TxUtils {
-
-  // Any cell with timestamp less than MAX_NON_TX_TIMESTAMP is assumed to be pre-existing data,
-  // i.e. data written before table was converted into transactional table using Tephra.
-  // Using 1.1 times current time to determine whether a timestamp is transactional timestamp or not is safe, and does
-  // not produce any false positives or false negatives.
-  //
-  // To prove this, let's say the earliest transactional timestamp written by Tephra was in year 2000, and the oldest
-  // that will be written is in year 2200.
-  // 01-Jan-2000 GMT is  946684800000 milliseconds since epoch.
-  // 31-Dec-2200 GMT is 7289654399000 milliseconds since epoch.
-  //
-  // Let's say that we enabled transactions on a table on 01-Jan-2000, then 1.1 * 946684800000 = 31-Dec-2002. Using
-  // 31-Dec-2002, we can safely say from 01-Jan-2000 onwards, whether a cell timestamp is
-  // non-transactional (<= 946684800000).
-  // Note that transactional timestamp will be greater than 946684800000000000 (> year 31969) at this instant.
-  //
-  // On the other end, let's say we enabled transactions on a table on 31-Dec-2200,
-  // then 1.1 * 7289654399000 = 07-Feb-2224. Again, we can use this time from 31-Dec-2200 onwards to say whether a
-  // cell timestamp is transactional (<= 7289654399000).
-  // Note that transactional timestamp will be greater than 7289654399000000000 (> year 232969) at this instant.
-  private static final long MAX_NON_TX_TIMESTAMP = (long) (System.currentTimeMillis() * 1.1);
-
-  /**
-   * Returns the oldest visible timestamp for the given transaction, based on the TTLs configured for each column
-   * family.  If no TTL is set on any column family, the oldest visible timestamp will be {@code 0}.
-   * @param ttlByFamily A map of column family name to TTL value (in milliseconds)
-   * @param tx The current transaction
-   * @return The oldest timestamp that will be visible for the given transaction and TTL configuration
-   */
-  public static long getOldestVisibleTimestamp(Map<byte[], Long> ttlByFamily, Transaction tx) {
-    long maxTTL = getMaxTTL(ttlByFamily);
-    // we know that data will not be cleaned up while this tx is running up to this point as janitor uses it
-    return maxTTL < Long.MAX_VALUE ? tx.getVisibilityUpperBound() - maxTTL * TxConstants.MAX_TX_PER_MS : 0;
-  }
-
-  /**
-   * Returns the oldest visible timestamp for the given transaction, based on the TTLs configured for each column
-   * family.  If no TTL is set on any column family, the oldest visible timestamp will be {@code 0}.
-   * @param ttlByFamily A map of column family name to TTL value (in milliseconds)
-   * @param tx The current transaction
-   * @param readNonTxnData indicates that the timestamp returned should allow reading non-transactional data
-   * @return The oldest timestamp that will be visible for the given transaction and TTL configuration
-   */
-  public static long getOldestVisibleTimestamp(Map<byte[], Long> ttlByFamily, Transaction tx, boolean readNonTxnData) {
-    if (readNonTxnData) {
-      long maxTTL = getMaxTTL(ttlByFamily);
-      return maxTTL < Long.MAX_VALUE ? System.currentTimeMillis() - maxTTL : 0;
-    }
-
-    return getOldestVisibleTimestamp(ttlByFamily, tx);
-  }
-
-  /**
-   * Returns the maximum timestamp to use for time-range operations, based on the given transaction.
-   * @param tx The current transaction
-   * @return The maximum timestamp (exclusive) to use for time-range operations
-   */
-  public static long getMaxVisibleTimestamp(Transaction tx) {
-    // NOTE: +1 here because we want read up to writepointer inclusive, but timerange's end is exclusive
-    // however, we also need to guard against overflow in the case write pointer is set to MAX_VALUE
-    return tx.getWritePointer() < Long.MAX_VALUE ?
-        tx.getWritePointer() + 1 : tx.getWritePointer();
-  }
-
-  /**
-   * Creates a "dummy" transaction based on the given txVisibilityState's state.  This is not a "real" transaction in
-   * the sense that it has not been started, data should not be written with it, and it cannot be committed.  However,
-   * this can still be useful for filtering data according to the txVisibilityState's state.  Instead of the actual
-   * write pointer from the txVisibilityState, however, we use {@code Long.MAX_VALUE} to avoid mis-identifying any cells
-   * as being written by this transaction (and therefore visible).
-   */
-  public static Transaction createDummyTransaction(TransactionVisibilityState txVisibilityState) {
-    return new Transaction(txVisibilityState.getReadPointer(), Long.MAX_VALUE,
-                           Longs.toArray(txVisibilityState.getInvalid()),
-                           Longs.toArray(txVisibilityState.getInProgress().keySet()),
-                           TxUtils.getFirstShortInProgress(txVisibilityState.getInProgress()), TransactionType.SHORT);
-  }
-
-  /**
-   * Returns the write pointer for the first "short" transaction that in the in-progress set, or
-   * {@link Transaction#NO_TX_IN_PROGRESS} if none.
-   */
-  public static long getFirstShortInProgress(Map<Long, TransactionManager.InProgressTx> inProgress) {
-    long firstShort = Transaction.NO_TX_IN_PROGRESS;
-    for (Map.Entry<Long, TransactionManager.InProgressTx> entry : inProgress.entrySet()) {
-      if (!entry.getValue().isLongRunning()) {
-        firstShort = entry.getKey();
-        break;
-      }
-    }
-    return firstShort;
-  }
-
-  /**
-   * Returns the timestamp for calculating time to live for the given cell timestamp.
-   * This takes into account pre-existing non-transactional cells while calculating the time.
-   */
-  public static long getTimestampForTTL(long cellTs) {
-    return isPreExistingVersion(cellTs) ? cellTs * TxConstants.MAX_TX_PER_MS : cellTs;
-  }
-
-  /**
-   * Returns the max TTL for the given TTL values. Returns Long.MAX_VALUE if any of the column families has no TTL set.
-   */
-  private static long getMaxTTL(Map<byte[], Long> ttlByFamily) {
-    long maxTTL = 0;
-    for (Long familyTTL : ttlByFamily.values()) {
-      maxTTL = Math.max(familyTTL <= 0 ? Long.MAX_VALUE : familyTTL, maxTTL);
-    }
-    return maxTTL == 0 ? Long.MAX_VALUE : maxTTL;
-  }
-
-  /**
-   * Returns true if version was written before table was converted into transactional table, false otherwise.
-   */
-  public static boolean isPreExistingVersion(long version) {
-    return version < MAX_NON_TX_TIMESTAMP;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/visibility/DefaultFenceWait.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/visibility/DefaultFenceWait.java b/tephra-core/src/main/java/co/cask/tephra/visibility/DefaultFenceWait.java
deleted file mode 100644
index 3d5b25b..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/visibility/DefaultFenceWait.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.visibility;
-
-import co.cask.tephra.TransactionContext;
-import co.cask.tephra.TransactionFailureException;
-import com.google.common.base.Stopwatch;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-/**
- * Default implementation of {@link FenceWait}.
- */
-public class DefaultFenceWait implements FenceWait {
-  private static final Logger LOG = LoggerFactory.getLogger(DefaultFenceWait.class);
-
-  private final TransactionContext txContext;
-
-  DefaultFenceWait(TransactionContext txContext) {
-    this.txContext = txContext;
-  }
-
-  @Override
-  public void await(long timeout, TimeUnit timeUnit)
-    throws TransactionFailureException, InterruptedException, TimeoutException {
-    Stopwatch stopwatch = new Stopwatch();
-    stopwatch.start();
-    long sleepTimeMicros = timeUnit.toMicros(timeout) / 10;
-    // Have sleep time to be within 1 microsecond and 500 milliseconds
-    sleepTimeMicros = Math.max(Math.min(sleepTimeMicros, 500 * 1000), 1);
-    while (stopwatch.elapsedTime(timeUnit) < timeout) {
-      txContext.start();
-      try {
-        txContext.finish();
-        return;
-      } catch (TransactionFailureException e) {
-        LOG.error("Got exception waiting for fence. Sleeping for {} microseconds", sleepTimeMicros, e);
-        txContext.abort();
-        TimeUnit.MICROSECONDS.sleep(sleepTimeMicros);
-      }
-    }
-    throw new TimeoutException("Timeout waiting for fence");
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/visibility/FenceWait.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/visibility/FenceWait.java b/tephra-core/src/main/java/co/cask/tephra/visibility/FenceWait.java
deleted file mode 100644
index a5370b2..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/visibility/FenceWait.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.visibility;
-
-import co.cask.tephra.TransactionFailureException;
-
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-/**
- * Used by a writer to wait on a fence so that changes are visible to all readers with in-progress transactions.
- */
-public interface FenceWait {
-  /**
-   * Waits until the fence is complete, or till the timeout specified. The fence wait transaction will get re-tried
-   * several times until the timeout.
-   * <p>
-   *
-   * If a fence wait times out then it means there are still some readers with in-progress transactions that have not
-   * seen the change. In this case the wait will have to be retried using the same FenceWait object.
-   *
-   * @param timeout Maximum time to wait
-   * @param timeUnit {@link TimeUnit} for timeout and sleepTime
-   * @throws TransactionFailureException when not able to start fence wait transaction
-   * @throws InterruptedException on any interrupt
-   * @throws TimeoutException when timeout is reached
-   */
-  void await(long timeout, TimeUnit timeUnit)
-    throws TransactionFailureException, InterruptedException, TimeoutException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/visibility/ReadFence.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/visibility/ReadFence.java b/tephra-core/src/main/java/co/cask/tephra/visibility/ReadFence.java
deleted file mode 100644
index f7b8850..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/visibility/ReadFence.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.visibility;
-
-import co.cask.tephra.Transaction;
-import co.cask.tephra.TransactionAware;
-import com.google.common.primitives.Bytes;
-import com.google.common.primitives.Longs;
-
-import java.util.Collection;
-import java.util.Collections;
-
-/**
- * Implementation of {@link VisibilityFence} used by reader.
- */
-class ReadFence implements TransactionAware {
-  private final byte[] fenceId;
-  private Transaction tx;
-
-  public ReadFence(byte[] fenceId) {
-    this.fenceId = fenceId;
-  }
-
-  @Override
-  public void startTx(Transaction tx) {
-    this.tx = tx;
-  }
-
-  @Override
-  public void updateTx(Transaction tx) {
-    // Fences only need original transaction
-  }
-
-  @Override
-  public Collection<byte[]> getTxChanges() {
-    if (tx == null) {
-      throw new IllegalStateException("Transaction has not started yet");
-    }
-    return Collections.singleton(Bytes.concat(fenceId, Longs.toByteArray(tx.getTransactionId())));
-  }
-
-  @Override
-  public boolean commitTx() throws Exception {
-    // Nothing to persist
-    return true;
-  }
-
-  @Override
-  public void postTxCommit() {
-    tx = null;
-  }
-
-  @Override
-  public boolean rollbackTx() throws Exception {
-    // Nothing to rollback
-    return true;
-  }
-
-  @Override
-  public String getTransactionAwareName() {
-    return getClass().getSimpleName();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/visibility/VisibilityFence.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/visibility/VisibilityFence.java b/tephra-core/src/main/java/co/cask/tephra/visibility/VisibilityFence.java
deleted file mode 100644
index beb92ed..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/visibility/VisibilityFence.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.visibility;
-
-import co.cask.tephra.TransactionAware;
-import co.cask.tephra.TransactionContext;
-import co.cask.tephra.TransactionFailureException;
-import co.cask.tephra.TransactionSystemClient;
-
-import java.util.concurrent.TimeoutException;
-
-/**
- * VisibilityFence is used to ensure that after a given point in time, all readers see an updated change
- * that got committed.
- * <p>
- *
- * Typically a reader will never conflict with a writer, since a reader only sees committed changes when its
- * transaction started. However to ensure that after a given point all readers are aware of a change,
- * we have to introduce a conflict between a reader and a writer that act on the same data concurrently.
- *<p>
- *
- * This is done by the reader indicating that it is interested in changes to a piece of data by using a fence
- * in its transaction. If there are no changes to the data when reader tries to commit the transaction
- * containing the fence, the commit succeeds.
- * <p>
- *
- * On the other hand, a writer updates the same data in a transaction. After the write transaction is committed,
- * the writer then waits on the fence to ensure that all in-progress readers are aware of this update.
- * When the wait on the fence returns successfully, it means that
- * any in-progress readers that have not seen the change will not be allowed to commit anymore. This will
- * force the readers to start a new transaction, and this ensures that the changes made by writer are visible
- * to the readers.
- *<p>
- *
- * In case an in-progress reader commits when the writer is waiting on the fence, then the wait method will retry
- * until the given timeout.
- * <p>
- *
- * Hence a successful await on a fence ensures that any reader (using the same fence) that successfully commits after
- * this point onwards would see the change.
- *
- * <p>
- * Sample reader code:
- * <pre>
- *   <code>
- * TransactionAware fence = VisibilityFence.create(fenceId);
- * TransactionContext readTxContext = new TransactionContext(txClient, fence, table1, table2, ...);
- * readTxContext.start();
- *
- * // do operations using table1, table2, etc.
- *
- * // finally commit
- * try {
- *   readTxContext.finish();
- * } catch (TransactionConflictException e) {
- *   // handle conflict by aborting and starting over with a new transaction
- * }
- *   </code>
- * </pre>
- *<p>
- *
- * Sample writer code:
- * <pre>
- *   <code>
- * // start transaction
- * // write change
- * // commit transaction
- *
- * // Now wait on the fence (with the same fenceId as the readers) to ensure that all in-progress readers are
- * aware of this change
- * try {
- *   FenceWait fenceWait = VisibilityFence.prepareWait(fenceId, txClient);
- *   fenceWait.await(50000, 50, TimeUnit.MILLISECONDS);
- * } catch (TimeoutException e) {
- *   // await timed out, the change may not be visible to all in-progress readers.
- *   // Application has two options at this point:
- *   // 1. Revert the write. Re-try the write and fence wait again.
- *   // 2. Retry only the wait with the same fenceWait object (retry logic is not shown here).
- * }
- *   </code>
- * </pre>
- *
- * fenceId in the above samples refers to any id that both the readers and writer know for a given
- * piece of data. Both readers and writer will have to use the same fenceId to synchronize on a given data.
- * Typically fenceId uniquely identifies the data in question.
- * For example, if the data is a table row, the fenceId can be composed of table name and row key.
- * If the data is a table cell, the fenceId can be composed of table name, row key, and column key.
- *<p>
- *
- * Note that in this implementation, any reader that starts a transaction after the write is committed, and
- * while this read transaction is in-progress, if a writer successfully starts and completes an await on the fence then
- * this reader will get a conflict while committing the fence even though this reader has seen the latest changes.
- * This is because today there is no way to determine the commit time of a transaction.
- */
-public final class VisibilityFence {
-  private VisibilityFence() {
-    // Cannot instantiate this class, all functionality is through static methods.
-  }
-
-  /**
-   * Used by a reader to get a fence that can be added to its transaction context.
-   *
-   * @param fenceId uniquely identifies the data that this fence is used to synchronize.
-   *                  If the data is a table cell then this id can be composed of the table name, row key
-   *                  and column key for the data.
-   * @return {@link TransactionAware} to be added to reader's transaction context.
-   */
-  public static TransactionAware create(byte[] fenceId) {
-    return new ReadFence(fenceId);
-  }
-
-  /**
-   * Used by a writer to wait on a fence so that changes are visible to all readers with in-progress transactions.
-   *
-   * @param fenceId uniquely identifies the data that this fence is used to synchronize.
-   *                  If the data is a table cell then this id can be composed of the table name, row key
-   *                  and column key for the data.
-   * @return {@link FenceWait} object
-   */
-  public static FenceWait prepareWait(byte[] fenceId, TransactionSystemClient txClient)
-    throws TransactionFailureException, InterruptedException, TimeoutException {
-    return new DefaultFenceWait(new TransactionContext(txClient, new WriteFence(fenceId)));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/visibility/WriteFence.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/visibility/WriteFence.java b/tephra-core/src/main/java/co/cask/tephra/visibility/WriteFence.java
deleted file mode 100644
index a16264d..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/visibility/WriteFence.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.visibility;
-
-import co.cask.tephra.Transaction;
-import co.cask.tephra.TransactionAware;
-import com.google.common.primitives.Bytes;
-import com.google.common.primitives.Longs;
-import com.google.common.primitives.UnsignedBytes;
-
-import java.util.Collection;
-import java.util.TreeSet;
-
-/**
- * Implementation used by {@link FenceWait} to wait for a {@link VisibilityFence}.
- */
-class WriteFence implements TransactionAware {
-  private final byte[] fenceId;
-  private Transaction tx;
-  private Collection<byte[]> inProgressChanges;
-
-  public WriteFence(byte[] fenceId) {
-    this.fenceId = fenceId;
-  }
-
-  @Override
-  public void startTx(Transaction tx) {
-    this.tx = tx;
-    if (inProgressChanges == null) {
-      inProgressChanges = new TreeSet<>(UnsignedBytes.lexicographicalComparator());
-      for (long inProgressTx : tx.getInProgress()) {
-        inProgressChanges.add(Bytes.concat(fenceId, Longs.toByteArray(inProgressTx)));
-      }
-    }
-  }
-
-  @Override
-  public void updateTx(Transaction tx) {
-    // Fences only need original transaction
-  }
-
-  @Override
-  public Collection<byte[]> getTxChanges() {
-    if (inProgressChanges == null || tx == null) {
-      throw new IllegalStateException("Transaction has not started yet");
-    }
-    return inProgressChanges;
-  }
-
-  @Override
-  public boolean commitTx() throws Exception {
-    // Nothing to persist
-    return true;
-  }
-
-  @Override
-  public void postTxCommit() {
-    tx = null;
-  }
-
-  @Override
-  public boolean rollbackTx() throws Exception {
-    // Nothing to rollback
-    return true;
-  }
-
-  @Override
-  public String getTransactionAwareName() {
-    return getClass().getSimpleName();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/zookeeper/BasicACLData.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/zookeeper/BasicACLData.java b/tephra-core/src/main/java/co/cask/tephra/zookeeper/BasicACLData.java
deleted file mode 100644
index ceaffad..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/zookeeper/BasicACLData.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.zookeeper;
-
-import org.apache.twill.zookeeper.ACLData;
-import org.apache.zookeeper.data.ACL;
-import org.apache.zookeeper.data.Stat;
-
-import java.util.List;
-
-/**
- * A straightforward implementation of {@link ACLData}.
- */
-final class BasicACLData implements ACLData {
-
-  private final List<ACL> acl;
-  private final Stat stat;
-
-  BasicACLData(List<ACL> acl, Stat stat) {
-    this.acl = acl;
-    this.stat = stat;
-  }
-
-  @Override
-  public List<ACL> getACL() {
-    return acl;
-  }
-
-  @Override
-  public Stat getStat() {
-    return stat;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/zookeeper/BasicNodeChildren.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/zookeeper/BasicNodeChildren.java b/tephra-core/src/main/java/co/cask/tephra/zookeeper/BasicNodeChildren.java
deleted file mode 100644
index ce81ade..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/zookeeper/BasicNodeChildren.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.zookeeper;
-
-import com.google.common.base.Objects;
-import org.apache.twill.zookeeper.NodeChildren;
-import org.apache.zookeeper.data.Stat;
-
-import java.util.List;
-
-/**
- * Implementation of the {@link NodeChildren}.
- */
-final class BasicNodeChildren implements NodeChildren {
-
-  private final Stat stat;
-  private final List<String> children;
-
-  BasicNodeChildren(List<String> children, Stat stat) {
-    this.stat = stat;
-    this.children = children;
-  }
-
-  @Override
-  public Stat getStat() {
-    return stat;
-  }
-
-  @Override
-  public List<String> getChildren() {
-    return children;
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || !(o instanceof NodeChildren)) {
-      return false;
-    }
-
-    NodeChildren that = (NodeChildren) o;
-    return stat.equals(that.getStat()) && children.equals(that.getChildren());
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hashCode(children, stat);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/zookeeper/BasicNodeData.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/zookeeper/BasicNodeData.java b/tephra-core/src/main/java/co/cask/tephra/zookeeper/BasicNodeData.java
deleted file mode 100644
index b09ff4b..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/zookeeper/BasicNodeData.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.zookeeper;
-
-import com.google.common.base.Objects;
-import org.apache.twill.zookeeper.NodeData;
-import org.apache.zookeeper.data.Stat;
-
-import java.util.Arrays;
-
-/**
- * A straightforward implementation for {@link NodeData}.
- */
-final class BasicNodeData implements NodeData {
-
-  private final byte[] data;
-  private final Stat stat;
-
-  BasicNodeData(byte[] data, Stat stat) {
-    this.data = data;
-    this.stat = stat;
-  }
-
-  @Override
-  public Stat getStat() {
-    return stat;
-  }
-
-  @Override
-  public byte[] getData() {
-    return data;
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || !(o instanceof NodeData)) {
-      return false;
-    }
-
-    BasicNodeData that = (BasicNodeData) o;
-
-    return stat.equals(that.getStat()) && Arrays.equals(data, that.getData());
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hashCode(data, stat);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/zookeeper/TephraZKClientService.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/zookeeper/TephraZKClientService.java b/tephra-core/src/main/java/co/cask/tephra/zookeeper/TephraZKClientService.java
deleted file mode 100644
index 5f6f565..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/zookeeper/TephraZKClientService.java
+++ /dev/null
@@ -1,627 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.zookeeper;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Supplier;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMultimap;
-import com.google.common.collect.Multimap;
-import com.google.common.util.concurrent.AbstractService;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import org.apache.twill.common.Cancellable;
-import org.apache.twill.common.Threads;
-import org.apache.twill.internal.zookeeper.SettableOperationFuture;
-import org.apache.twill.zookeeper.ACLData;
-import org.apache.twill.zookeeper.NodeChildren;
-import org.apache.twill.zookeeper.NodeData;
-import org.apache.twill.zookeeper.OperationFuture;
-import org.apache.twill.zookeeper.ZKClientService;
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.ACL;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import javax.annotation.Nullable;
-
-/**
- * The implementation of {@link ZKClientService}.
- */
-public class TephraZKClientService extends AbstractService implements ZKClientService, Watcher {
-
-  private static final Logger LOG = LoggerFactory.getLogger(TephraZKClientService.class);
-
-  private final String zkStr;
-  private final int sessionTimeout;
-  private final List<Watcher> connectionWatchers;
-  private final Multimap<String, byte[]> authInfos;
-  private final AtomicReference<ZooKeeper> zooKeeper;
-  private final Runnable stopTask;
-  private ExecutorService eventExecutor;
-
-  /**
-   * Create a new instance.
-   * @param zkStr zookeper connection string
-   * @param sessionTimeout timeout in milliseconds
-   * @param connectionWatcher watcher to set
-   * @param authInfos authorization bytes
-   */
-  public TephraZKClientService(String zkStr, int sessionTimeout,
-                               Watcher connectionWatcher, Multimap<String, byte[]> authInfos) {
-    this.zkStr = zkStr;
-    this.sessionTimeout = sessionTimeout;
-    this.connectionWatchers = new CopyOnWriteArrayList<>();
-    this.authInfos = copyAuthInfo(authInfos);
-    addConnectionWatcher(connectionWatcher);
-
-    this.zooKeeper = new AtomicReference<>();
-    this.stopTask = createStopTask();
-  }
-
-  @Override
-  public Long getSessionId() {
-    ZooKeeper zk = zooKeeper.get();
-    return zk == null ? null : zk.getSessionId();
-  }
-
-  @Override
-  public String getConnectString() {
-    return zkStr;
-  }
-
-  @Override
-  public Cancellable addConnectionWatcher(final Watcher watcher) {
-    if (watcher == null) {
-      return new Cancellable() {
-        @Override
-        public void cancel() {
-          // No-op
-        }
-      };
-    }
-
-    // Invocation of connection watchers are already done inside the event thread,
-    // hence no need to wrap the watcher again.
-    connectionWatchers.add(watcher);
-    return new Cancellable() {
-      @Override
-      public void cancel() {
-        connectionWatchers.remove(watcher);
-      }
-    };
-  }
-
-  @Override
-  public OperationFuture<String> create(String path, @Nullable byte[] data, CreateMode createMode) {
-    return create(path, data, createMode, true);
-  }
-
-  @Override
-  public OperationFuture<String> create(String path, @Nullable byte[] data,
-                                        CreateMode createMode, boolean createParent) {
-    return create(path, data, createMode, createParent, ZooDefs.Ids.OPEN_ACL_UNSAFE);
-  }
-
-  @Override
-  public OperationFuture<String> create(String path, @Nullable byte[] data,
-                                        CreateMode createMode, Iterable<ACL> acl) {
-    return create(path, data, createMode, true, acl);
-  }
-
-  @Override
-  public OperationFuture<Stat> exists(String path) {
-    return exists(path, null);
-  }
-
-  @Override
-  public OperationFuture<NodeChildren> getChildren(String path) {
-    return getChildren(path, null);
-  }
-
-  @Override
-  public OperationFuture<NodeData> getData(String path) {
-    return getData(path, null);
-  }
-
-  @Override
-  public OperationFuture<Stat> setData(String path, byte[] data) {
-    return setData(path, data, -1);
-  }
-
-  @Override
-  public OperationFuture<String> delete(String path) {
-    return delete(path, -1);
-  }
-
-  @Override
-  public OperationFuture<Stat> setACL(String path, Iterable<ACL> acl) {
-    return setACL(path, acl, -1);
-  }
-
-  @Override
-  public OperationFuture<String> create(String path, @Nullable byte[] data, CreateMode createMode,
-                                        boolean createParent, Iterable<ACL> acl) {
-    return doCreate(path, data, createMode, createParent, ImmutableList.copyOf(acl), false);
-  }
-
-  private OperationFuture<String> doCreate(final String path,
-                                           @Nullable final byte[] data,
-                                           final CreateMode createMode,
-                                           final boolean createParent,
-                                           final List<ACL> acl,
-                                           final boolean ignoreNodeExists) {
-    final SettableOperationFuture<String> createFuture = SettableOperationFuture.create(path, eventExecutor);
-    getZooKeeper().create(path, data, acl, createMode, Callbacks.STRING, createFuture);
-    if (!createParent) {
-      return createFuture;
-    }
-
-    // If create parent is request, return a different future
-    final SettableOperationFuture<String> result = SettableOperationFuture.create(path, eventExecutor);
-    // Watch for changes in the original future
-    Futures.addCallback(createFuture, new FutureCallback<String>() {
-      @Override
-      public void onSuccess(String path) {
-        // Propagate if creation was successful
-        result.set(path);
-      }
-
-      @Override
-      public void onFailure(Throwable t) {
-        // See if the failure can be handled
-        if (updateFailureResult(t, result, path, ignoreNodeExists)) {
-          return;
-        }
-        // Create the parent node
-        String parentPath = getParent(path);
-        if (parentPath.isEmpty()) {
-          result.setException(t);
-          return;
-        }
-        // Watch for parent creation complete. Parent is created with the unsafe ACL.
-        Futures.addCallback(doCreate(parentPath, null, CreateMode.PERSISTENT,
-                                     true, ZooDefs.Ids.OPEN_ACL_UNSAFE, true), new FutureCallback<String>() {
-          @Override
-          public void onSuccess(String parentPath) {
-            // Create the requested path again
-            Futures.addCallback(
-              doCreate(path, data, createMode, false, acl, ignoreNodeExists), new FutureCallback<String>() {
-                @Override
-                public void onSuccess(String pathResult) {
-                  result.set(pathResult);
-                }
-
-                @Override
-                public void onFailure(Throwable t) {
-                  // handle the failure
-                  updateFailureResult(t, result, path, ignoreNodeExists);
-                }
-              });
-          }
-
-          @Override
-          public void onFailure(Throwable t) {
-            result.setException(t);
-          }
-        });
-      }
-
-      /**
-       * Updates the result future based on the given {@link Throwable}.
-       * @param t Cause of the failure
-       * @param result Future to be updated
-       * @param path Request path for the operation
-       * @return {@code true} if it is a failure, {@code false} otherwise.
-       */
-      private boolean updateFailureResult(Throwable t, SettableOperationFuture<String> result,
-                                          String path, boolean ignoreNodeExists) {
-        // Propagate if there is error
-        if (!(t instanceof KeeperException)) {
-          result.setException(t);
-          return true;
-        }
-        KeeperException.Code code = ((KeeperException) t).code();
-        // Node already exists, simply return success if it allows for ignoring node exists (for parent node creation).
-        if (ignoreNodeExists && code == KeeperException.Code.NODEEXISTS) {
-          // The requested path could be used because it only applies to non-sequential node
-          result.set(path);
-          return false;
-        }
-        if (code != KeeperException.Code.NONODE) {
-          result.setException(t);
-          return true;
-        }
-        return false;
-      }
-
-      /**
-       * Gets the parent of the given path.
-       * @param path Path for computing its parent
-       * @return Parent of the given path, or empty string if the given path is the root path already.
-       */
-      private String getParent(String path) {
-        String parentPath = path.substring(0, path.lastIndexOf('/'));
-        return (parentPath.isEmpty() && !"/".equals(path)) ? "/" : parentPath;
-      }
-    });
-
-    return result;
-  }
-
-  @Override
-  public OperationFuture<Stat> exists(String path, Watcher watcher) {
-    SettableOperationFuture<Stat> result = SettableOperationFuture.create(path, eventExecutor);
-    getZooKeeper().exists(path, wrapWatcher(watcher), Callbacks.STAT_NONODE, result);
-    return result;
-  }
-
-  @Override
-  public OperationFuture<NodeChildren> getChildren(String path, Watcher watcher) {
-    SettableOperationFuture<NodeChildren> result = SettableOperationFuture.create(path, eventExecutor);
-    getZooKeeper().getChildren(path, wrapWatcher(watcher), Callbacks.CHILDREN, result);
-    return result;
-  }
-
-  @Override
-  public OperationFuture<NodeData> getData(String path, Watcher watcher) {
-    SettableOperationFuture<NodeData> result = SettableOperationFuture.create(path, eventExecutor);
-    getZooKeeper().getData(path, wrapWatcher(watcher), Callbacks.DATA, result);
-
-    return result;
-  }
-
-  @Override
-  public OperationFuture<Stat> setData(String dataPath, byte[] data, int version) {
-    SettableOperationFuture<Stat> result = SettableOperationFuture.create(dataPath, eventExecutor);
-    getZooKeeper().setData(dataPath, data, version, Callbacks.STAT, result);
-    return result;
-  }
-
-  @Override
-  public OperationFuture<String> delete(String deletePath, int version) {
-    SettableOperationFuture<String> result = SettableOperationFuture.create(deletePath, eventExecutor);
-    getZooKeeper().delete(deletePath, version, Callbacks.VOID, result);
-    return result;
-  }
-
-  @Override
-  public OperationFuture<ACLData> getACL(String path) {
-    SettableOperationFuture<ACLData> result = SettableOperationFuture.create(path, eventExecutor);
-    getZooKeeper().getACL(path, new Stat(), Callbacks.ACL, result);
-    return result;
-  }
-
-  @Override
-  public OperationFuture<Stat> setACL(String path, Iterable<ACL> acl, int version) {
-    SettableOperationFuture<Stat> result = SettableOperationFuture.create(path, eventExecutor);
-    getZooKeeper().setACL(path, ImmutableList.copyOf(acl), version, Callbacks.STAT, result);
-    return result;
-  }
-
-  @Override
-  public Supplier<ZooKeeper> getZooKeeperSupplier() {
-    return new Supplier<ZooKeeper>() {
-      @Override
-      public ZooKeeper get() {
-        return getZooKeeper();
-      }
-    };
-  }
-
-  @Override
-  protected void doStart() {
-    // A single thread executor for all events
-    ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
-                                                         new LinkedBlockingQueue<Runnable>(),
-                                                         Threads.createDaemonThreadFactory("zk-client-EventThread"));
-    // Just discard the execution if the executor is closed
-    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
-    eventExecutor = executor;
-
-    try {
-      zooKeeper.set(createZooKeeper());
-    } catch (IOException e) {
-      notifyFailed(e);
-    }
-  }
-
-  @Override
-  protected void doStop() {
-    // Submit a task to the executor to make sure all pending events in the executor are fired before
-    // transiting this Service into STOPPED state
-    eventExecutor.submit(stopTask);
-    eventExecutor.shutdown();
-  }
-
-  /**
-   * @return Current {@link ZooKeeper} client.
-   */
-  private ZooKeeper getZooKeeper() {
-    ZooKeeper zk = zooKeeper.get();
-    Preconditions.checkArgument(zk != null, "Not connected to zooKeeper.");
-    return zk;
-  }
-
-  /**
-   * Wraps the given watcher to be called from the event executor.
-   * @param watcher Watcher to be wrapped
-   * @return The wrapped Watcher
-   */
-  private Watcher wrapWatcher(final Watcher watcher) {
-    if (watcher == null) {
-      return null;
-    }
-    return new Watcher() {
-      @Override
-      public void process(final WatchedEvent event) {
-        if (eventExecutor.isShutdown()) {
-          LOG.debug("Already shutdown. Discarding event: {}", event);
-          return;
-        }
-        eventExecutor.execute(new Runnable() {
-          @Override
-          public void run() {
-            try {
-              watcher.process(event);
-            } catch (Throwable t) {
-              LOG.error("Watcher throws exception.", t);
-            }
-          }
-        });
-      }
-    };
-  }
-
-  /**
-   * Creates a deep copy of the given authInfos multimap.
-   */
-  private Multimap<String, byte[]> copyAuthInfo(Multimap<String, byte[]> authInfos) {
-    Multimap<String, byte[]> result = ArrayListMultimap.create();
-
-    for (Map.Entry<String, byte[]> entry : authInfos.entries()) {
-      byte[] info = entry.getValue();
-      result.put(entry.getKey(), info == null ? null : Arrays.copyOf(info, info.length));
-    }
-
-    return result;
-  }
-
-  @Override
-  public void process(WatchedEvent event) {
-    State state = state();
-    if (state == State.TERMINATED || state == State.FAILED) {
-      return;
-    }
-
-    try {
-      if (event.getState() == Event.KeeperState.SyncConnected && state == State.STARTING) {
-        LOG.debug("Connected to ZooKeeper: {}", zkStr);
-        notifyStarted();
-        return;
-      }
-      if (event.getState() == Event.KeeperState.Expired) {
-        LOG.info("ZooKeeper session expired: {}", zkStr);
-
-        // When connection expired, simply reconnect again
-        if (state != State.RUNNING) {
-          return;
-        }
-        eventExecutor.submit(new Runnable() {
-          @Override
-          public void run() {
-            // Only reconnect if the current state is running
-            if (state() != State.RUNNING) {
-              return;
-            }
-            try {
-              LOG.info("Reconnect to ZooKeeper due to expiration: {}", zkStr);
-              closeZooKeeper(zooKeeper.getAndSet(createZooKeeper()));
-            } catch (IOException e) {
-              notifyFailed(e);
-            }
-          }
-        });
-      }
-    } finally {
-      if (event.getType() == Event.EventType.None) {
-        for (Watcher connectionWatcher : connectionWatchers) {
-          connectionWatcher.process(event);
-        }
-      }
-    }
-  }
-
-  /**
-   * Creates a new ZooKeeper connection.
-   */
-  private ZooKeeper createZooKeeper() throws IOException {
-    ZooKeeper zk = new ZooKeeper(zkStr, sessionTimeout, wrapWatcher(this));
-    for (Map.Entry<String, byte[]> authInfo : authInfos.entries()) {
-      zk.addAuthInfo(authInfo.getKey(), authInfo.getValue());
-    }
-    return zk;
-  }
-
-  /**
-   * Closes the given {@link ZooKeeper} if it is not null. If there is InterruptedException,
-   * it will get logged.
-   */
-  private void closeZooKeeper(@Nullable ZooKeeper zk) {
-    try {
-      if (zk != null) {
-        zk.close();
-      }
-    } catch (InterruptedException e) {
-      LOG.warn("Interrupted when closing ZooKeeper", e);
-      Thread.currentThread().interrupt();
-    }
-  }
-
-  /**
-   * Creates a {@link Runnable} task that will get executed in the event executor for transiting this
-   * Service into STOPPED state.
-   */
-  private Runnable createStopTask() {
-    return new Runnable() {
-      @Override
-      public void run() {
-        try {
-          // Close the ZK connection in this task will make sure if there is ZK connection created
-          // after doStop() was called but before this task has been executed is also closed.
-          // It is possible to happen when the following sequence happens:
-          //
-          // 1. session expired, hence the expired event is triggered
-          // 2. The reconnect task executed. With Service.state() == RUNNING, it creates a new ZK client
-          // 3. Service.stop() gets called, Service.state() changed to STOPPING
-          // 4. The new ZK client created from the reconnect thread update the zooKeeper with the new one
-          closeZooKeeper(zooKeeper.getAndSet(null));
-          notifyStopped();
-        } catch (Exception e) {
-          notifyFailed(e);
-        }
-      }
-    };
-  }
-
-  /**
-   * Collection of generic callbacks that simply reflect results into OperationFuture.
-   */
-  private static final class Callbacks {
-    static final AsyncCallback.StringCallback STRING = new AsyncCallback.StringCallback() {
-      @Override
-      @SuppressWarnings("unchecked")
-      public void processResult(int rc, String path, Object ctx, String name) {
-        SettableOperationFuture<String> result = (SettableOperationFuture<String>) ctx;
-        KeeperException.Code code = KeeperException.Code.get(rc);
-        if (code == KeeperException.Code.OK) {
-          result.set((name == null || name.isEmpty()) ? path : name);
-          return;
-        }
-        result.setException(KeeperException.create(code, result.getRequestPath()));
-      }
-    };
-
-    static final AsyncCallback.StatCallback STAT = new AsyncCallback.StatCallback() {
-      @Override
-      @SuppressWarnings("unchecked")
-      public void processResult(int rc, String path, Object ctx, Stat stat) {
-        SettableOperationFuture<Stat> result = (SettableOperationFuture<Stat>) ctx;
-        KeeperException.Code code = KeeperException.Code.get(rc);
-        if (code == KeeperException.Code.OK) {
-          result.set(stat);
-          return;
-        }
-        result.setException(KeeperException.create(code, result.getRequestPath()));
-      }
-    };
-
-    /**
-     * A stat callback that treats NONODE as success.
-     */
-    static final AsyncCallback.StatCallback STAT_NONODE = new AsyncCallback.StatCallback() {
-      @Override
-      @SuppressWarnings("unchecked")
-      public void processResult(int rc, String path, Object ctx, Stat stat) {
-        SettableOperationFuture<Stat> result = (SettableOperationFuture<Stat>) ctx;
-        KeeperException.Code code = KeeperException.Code.get(rc);
-        if (code == KeeperException.Code.OK || code == KeeperException.Code.NONODE) {
-          result.set(stat);
-          return;
-        }
-        result.setException(KeeperException.create(code, result.getRequestPath()));
-      }
-    };
-
-    static final AsyncCallback.Children2Callback CHILDREN = new AsyncCallback.Children2Callback() {
-      @Override
-      @SuppressWarnings("unchecked")
-      public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
-        SettableOperationFuture<NodeChildren> result = (SettableOperationFuture<NodeChildren>) ctx;
-        KeeperException.Code code = KeeperException.Code.get(rc);
-        if (code == KeeperException.Code.OK) {
-          result.set(new BasicNodeChildren(children, stat));
-          return;
-        }
-        result.setException(KeeperException.create(code, result.getRequestPath()));
-      }
-    };
-
-    static final AsyncCallback.DataCallback DATA = new AsyncCallback.DataCallback() {
-      @Override
-      @SuppressWarnings("unchecked")
-      public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
-        SettableOperationFuture<NodeData> result = (SettableOperationFuture<NodeData>) ctx;
-        KeeperException.Code code = KeeperException.Code.get(rc);
-        if (code == KeeperException.Code.OK) {
-          result.set(new BasicNodeData(data, stat));
-          return;
-        }
-        result.setException(KeeperException.create(code, result.getRequestPath()));
-      }
-    };
-
-    static final AsyncCallback.VoidCallback VOID = new AsyncCallback.VoidCallback() {
-      @Override
-      @SuppressWarnings("unchecked")
-      public void processResult(int rc, String path, Object ctx) {
-        SettableOperationFuture<String> result = (SettableOperationFuture<String>) ctx;
-        KeeperException.Code code = KeeperException.Code.get(rc);
-        if (code == KeeperException.Code.OK) {
-          result.set(result.getRequestPath());
-          return;
-        }
-        // Otherwise, it is an error
-        result.setException(KeeperException.create(code, result.getRequestPath()));
-      }
-    };
-
-    static final AsyncCallback.ACLCallback ACL = new AsyncCallback.ACLCallback() {
-      @Override
-      @SuppressWarnings("unchecked")
-      public void processResult(int rc, String path, Object ctx, List<ACL> acl, Stat stat) {
-        SettableOperationFuture<ACLData> result = (SettableOperationFuture<ACLData>) ctx;
-        KeeperException.Code code = KeeperException.Code.get(rc);
-        if (code == KeeperException.Code.OK) {
-          result.set(new BasicACLData(acl, stat));
-          return;
-        }
-        result.setException(KeeperException.create(code, result.getRequestPath()));
-      }
-    };
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/AbstractTransactionAwareTable.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/AbstractTransactionAwareTable.java b/tephra-core/src/main/java/org/apache/tephra/AbstractTransactionAwareTable.java
new file mode 100644
index 0000000..64fd7ed
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/AbstractTransactionAwareTable.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Objects;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.primitives.Bytes;
+import com.google.common.primitives.UnsignedBytes;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+/**
+ * Base class for all the common parts of the HBase version-specific {@code TransactionAwareHTable}
+ * implementations.
+ */
+public abstract class AbstractTransactionAwareTable implements TransactionAware {
+  protected final TransactionCodec txCodec;
+  // map of write pointers to change set associated with each
+  protected final Map<Long, Set<ActionChange>> changeSets;
+  protected final TxConstants.ConflictDetection conflictLevel;
+  protected Transaction tx;
+  protected boolean allowNonTransactional;
+
+  public AbstractTransactionAwareTable(TxConstants.ConflictDetection conflictLevel, boolean allowNonTransactional) {
+    this.conflictLevel = conflictLevel;
+    this.allowNonTransactional = allowNonTransactional;
+    this.txCodec = new TransactionCodec();
+    this.changeSets = Maps.newHashMap();
+  }
+
+  /**
+   * True if the instance allows non-transaction operations.
+   * @return
+   */
+  public boolean getAllowNonTransactional() {
+    return this.allowNonTransactional;
+  }
+
+  /**
+   * Set whether the instance allows non-transactional operations.
+   * @param allowNonTransactional
+   */
+  public void setAllowNonTransactional(boolean allowNonTransactional) {
+    this.allowNonTransactional = allowNonTransactional;
+  }
+
+  @Override
+  public void startTx(Transaction tx) {
+    this.tx = tx;
+  }
+
+  @Override
+  public void updateTx(Transaction tx) {
+    this.tx = tx;
+  }
+
+  @Override
+  public Collection<byte[]> getTxChanges() {
+    if (conflictLevel == TxConstants.ConflictDetection.NONE) {
+      return Collections.emptyList();
+    }
+
+    Collection<byte[]> txChanges = new TreeSet<byte[]>(UnsignedBytes.lexicographicalComparator());
+    for (Set<ActionChange> changeSet : changeSets.values()) {
+      for (ActionChange change : changeSet) {
+        txChanges.add(getChangeKey(change.getRow(), change.getFamily(), change.getQualifier()));
+      }
+    }
+    return txChanges;
+  }
+
+  public byte[] getChangeKey(byte[] row, byte[] family, byte[] qualifier) {
+    byte[] key;
+    switch (conflictLevel) {
+      case ROW:
+        key = Bytes.concat(getTableKey(), row);
+        break;
+      case COLUMN:
+        key = Bytes.concat(getTableKey(), row, family, qualifier);
+        break;
+      case NONE:
+        throw new IllegalStateException("NONE conflict detection does not support change keys");
+      default:
+        throw new IllegalStateException("Unknown conflict detection level: " + conflictLevel);
+    }
+    return key;
+  }
+
+  @Override
+  public boolean commitTx() throws Exception {
+    return doCommit();
+  }
+
+  /**
+   * Commits any pending writes by flushing the wrapped {@code HTable} instance.
+   */
+  protected abstract boolean doCommit() throws IOException;
+
+  @Override
+  public void postTxCommit() {
+    tx = null;
+    changeSets.clear();
+  }
+
+  @Override
+  public String getTransactionAwareName() {
+    return new String(getTableKey(), Charsets.UTF_8);
+  }
+
+  /**
+   * Returns the table name to use as a key prefix for the transaction change set.
+   */
+  protected abstract byte[] getTableKey();
+
+  @Override
+  public boolean rollbackTx() throws Exception {
+    return doRollback();
+  }
+
+  /**
+   * Rolls back any persisted changes from the transaction by issuing offsetting deletes to the
+   * wrapped {@code HTable} instance.  How this is handled will depend on the delete API exposed
+   * by the specific version of HBase.
+   */
+  protected abstract boolean doRollback() throws Exception;
+
+  protected void addToChangeSet(byte[] row, byte[] family, byte[] qualifier) {
+    long currentWritePointer = tx.getWritePointer();
+    Set<ActionChange> changeSet = changeSets.get(currentWritePointer);
+    if (changeSet == null) {
+      changeSet = Sets.newHashSet();
+      changeSets.put(currentWritePointer, changeSet);
+    }
+    switch (conflictLevel) {
+      case ROW:
+      case NONE:
+        // with ROW or NONE conflict detection, we still need to track changes per-family, since this
+        // is the granularity at which we will issue deletes for rollback
+        changeSet.add(new ActionChange(row, family));
+        break;
+      case COLUMN:
+        changeSet.add(new ActionChange(row, family, qualifier));
+        break;
+      default:
+        throw new IllegalStateException("Unknown conflict detection level: " + conflictLevel);
+    }
+  }
+
+  /**
+   * Record of each transaction that causes a change. This reference is used to rollback
+   * any operation upon failure.
+   */
+  protected class ActionChange {
+    private final byte[] row;
+    private final byte[] family;
+    private final byte[] qualifier;
+
+    public ActionChange(byte[] row, byte[] family) {
+      this(row, family, null);
+    }
+
+    public ActionChange(byte[] row, byte[] family, byte[] qualifier) {
+      this.row = row;
+      this.family = family;
+      this.qualifier = qualifier;
+    }
+
+    public byte[] getRow() {
+      return row;
+    }
+
+    public byte[] getFamily() {
+      return family;
+    }
+
+    public byte[] getQualifier() {
+      return qualifier;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (o == null || o.getClass() != this.getClass()) {
+        return false;
+      }
+
+      if (o == this) {
+        return true;
+      }
+
+      ActionChange other = (ActionChange) o;
+      return Objects.equal(this.row, other.row) &&
+             Objects.equal(this.family, other.family) &&
+             Objects.equal(this.qualifier, other.qualifier);
+    }
+
+    @Override
+    public int hashCode() {
+      int result = Arrays.hashCode(row);
+      result = 31 * result + (family != null ? Arrays.hashCode(family) : 0);
+      result = 31 * result + (qualifier != null ? Arrays.hashCode(qualifier) : 0);
+      return result;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/AbstractTransactionExecutor.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/AbstractTransactionExecutor.java b/tephra-core/src/main/java/org/apache/tephra/AbstractTransactionExecutor.java
new file mode 100644
index 0000000..528085f
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/AbstractTransactionExecutor.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra;
+
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * Provides implementation of asynchronous methods of {@link TransactionExecutor} by delegating their execution
+ * to respective synchronous methods via provided {@link ExecutorService}.
+ */
+public abstract class AbstractTransactionExecutor implements TransactionExecutor {
+  private final ListeningExecutorService executorService;
+
+  protected AbstractTransactionExecutor(ExecutorService executorService) {
+    this.executorService = MoreExecutors.listeningDecorator(executorService);
+  }
+
+  @Override
+  public <I, O> O executeUnchecked(Function<I, O> function, I input) {
+    try {
+      return execute(function, input);
+    } catch (TransactionFailureException e) {
+      throw Throwables.propagate(e);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw Throwables.propagate(e);
+    }
+  }
+
+  @Override
+  public <I> void executeUnchecked(Procedure<I> procedure, I input) {
+    try {
+      execute(procedure, input);
+    } catch (TransactionFailureException e) {
+      throw Throwables.propagate(e);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw Throwables.propagate(e);
+    }
+  }
+
+  @Override
+  public <O> O executeUnchecked(Callable<O> callable) {
+    try {
+      return execute(callable);
+    } catch (TransactionFailureException e) {
+      throw Throwables.propagate(e);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw Throwables.propagate(e);
+    }
+  }
+
+  @Override
+  public void executeUnchecked(Subroutine subroutine) {
+    try {
+      execute(subroutine);
+    } catch (TransactionFailureException e) {
+      throw Throwables.propagate(e);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw Throwables.propagate(e);
+    }
+  }
+
+  @Override
+  public <I, O> ListenableFuture<O> submit(final Function<I, O> function, final I input) {
+    return executorService.submit(new Callable<O>() {
+      @Override
+      public O call() throws Exception {
+        return execute(function, input);
+      }
+    });
+  }
+
+  @Override
+  public <I> ListenableFuture<?> submit(final Procedure<I> procedure, final I input) {
+    return executorService.submit(new Callable<Object>() {
+      @Override
+      public I call() throws Exception {
+        execute(procedure, input);
+        return null;
+      }
+    });
+  }
+
+  @Override
+  public <O> ListenableFuture<O> submit(final Callable<O> callable) {
+    return executorService.submit(new Callable<O>() {
+      @Override
+      public O call() throws Exception {
+        return execute(callable);
+      }
+    });
+  }
+
+  @Override
+  public ListenableFuture<?> submit(final Subroutine subroutine) {
+    return executorService.submit(new Callable<Object>() {
+      @Override
+      public Object call() throws Exception {
+        execute(subroutine);
+        return null;
+      }
+    });
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/ChangeId.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/ChangeId.java b/tephra-core/src/main/java/org/apache/tephra/ChangeId.java
new file mode 100644
index 0000000..0eb7191
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/ChangeId.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra;
+
+import java.util.Arrays;
+
+/**
+ * Represents a row key from a data set changed as part of a transaction.
+ */
+public final class ChangeId {
+  private final byte[] key;
+  private final int hash;
+
+  public ChangeId(byte[] bytes) {
+    key = bytes;
+    hash = Arrays.hashCode(bytes);
+  }
+
+  public byte[] getKey() {
+    return key;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o == this) {
+      return true;
+    }
+    if (o == null || o.getClass() != ChangeId.class) {
+      return false;
+    }
+    ChangeId other = (ChangeId) o;
+    return hash == other.hash && Arrays.equals(key, other.key);
+  }
+
+  @Override
+  public int hashCode() {
+    return hash;
+  }
+
+  @Override
+  public String toString() {
+    return toStringBinary(key, 0, key.length);
+  }
+
+  // Copy from Bytes.toStringBinary so that we don't need direct dependencies on Bytes.
+  private String toStringBinary(byte [] b, int off, int len) {
+    StringBuilder result = new StringBuilder();
+    for (int i = off; i < off + len; ++i) {
+      int ch = b[i] & 0xFF;
+      if ((ch >= '0' && ch <= '9')
+       || (ch >= 'A' && ch <= 'Z')
+       || (ch >= 'a' && ch <= 'z')
+       || " `~!@#$%^&*()-_=+[]{}|;:'\",.<>/?".indexOf(ch) >= 0) {
+        result.append((char) ch);
+      } else {
+        result.append(String.format("\\x%02X", ch));
+      }
+    }
+    return result.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/DefaultTransactionExecutor.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/DefaultTransactionExecutor.java b/tephra-core/src/main/java/org/apache/tephra/DefaultTransactionExecutor.java
new file mode 100644
index 0000000..c5e1cb3
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/DefaultTransactionExecutor.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Utility class that encapsulates the transaction life cycle over a given set of
+ * transaction-aware datasets. The executor can be reused across multiple invocations
+ * of the execute() method. However, it is not thread-safe for concurrent execution.
+ * <p>
+ *   Transaction execution will be retries according to specified in constructor {@link RetryStrategy}.
+ *   By default {@link RetryOnConflictStrategy} is used with max 20 retries and 100 ms between retries.
+ * </p>
+ */
+public class DefaultTransactionExecutor extends AbstractTransactionExecutor {
+
+  private final Collection<TransactionAware> txAwares;
+  private final TransactionSystemClient txClient;
+  private final RetryStrategy retryStrategy;
+
+  /**
+   * Convenience constructor, has same affect as {@link #DefaultTransactionExecutor(TransactionSystemClient, Iterable)}
+   */
+  public DefaultTransactionExecutor(TransactionSystemClient txClient, TransactionAware... txAwares) {
+    this(txClient, Arrays.asList(txAwares));
+  }
+
+
+  public DefaultTransactionExecutor(TransactionSystemClient txClient,
+                                    Iterable<TransactionAware> txAwares,
+                                    RetryStrategy retryStrategy) {
+
+    super(MoreExecutors.sameThreadExecutor());
+    this.txAwares = ImmutableList.copyOf(txAwares);
+    this.txClient = txClient;
+    this.retryStrategy = retryStrategy;
+  }
+
+  /**
+   * Constructor for a transaction executor.
+   */
+  @Inject
+  public DefaultTransactionExecutor(TransactionSystemClient txClient, @Assisted Iterable<TransactionAware> txAwares) {
+    this(txClient, txAwares, RetryStrategies.retryOnConflict(20, 100));
+  }
+
+  @Override
+  public <I, O> O execute(Function<I, O> function, I input) throws TransactionFailureException, InterruptedException {
+    return executeWithRetry(function, input);
+  }
+
+  @Override
+  public <I> void execute(final Procedure<I> procedure, I input)
+    throws TransactionFailureException, InterruptedException {
+
+    execute(new Function<I, Void>() {
+      @Override
+      public Void apply(I input) throws Exception {
+        procedure.apply(input);
+        return null;
+      }
+    }, input);
+  }
+
+  @Override
+  public <O> O execute(final Callable<O> callable) throws TransactionFailureException, InterruptedException {
+    return execute(new Function<Void, O>() {
+      @Override
+      public O apply(Void input) throws Exception {
+        return callable.call();
+      }
+    }, null);
+  }
+
+  @Override
+  public void execute(final Subroutine subroutine) throws TransactionFailureException, InterruptedException {
+    execute(new Function<Void, Void>() {
+      @Override
+      public Void apply(Void input) throws Exception {
+        subroutine.apply();
+        return null;
+      }
+    }, null);
+  }
+
+  private <I, O> O executeWithRetry(Function<I, O> function, I input)
+    throws TransactionFailureException, InterruptedException {
+
+    int retries = 0;
+    while (true) {
+      try {
+        return executeOnce(function, input);
+      } catch (TransactionFailureException e) {
+        long delay = retryStrategy.nextRetry(e, ++retries);
+
+        if (delay < 0) {
+          throw e;
+        }
+
+        if (delay > 0) {
+          TimeUnit.MILLISECONDS.sleep(delay);
+        }
+      }
+    }
+
+  }
+
+  private <I, O> O executeOnce(Function<I, O> function, I input) throws TransactionFailureException {
+    TransactionContext txContext = new TransactionContext(txClient, txAwares);
+    txContext.start();
+    O o = null;
+    try {
+      o = function.apply(input);
+    } catch (Throwable e) {
+      txContext.abort(new TransactionFailureException("Transaction function failure for transaction. ", e));
+      // abort will throw
+    }
+    // will throw if smth goes wrong
+    txContext.finish();
+    return o;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/InvalidTruncateTimeException.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/InvalidTruncateTimeException.java b/tephra-core/src/main/java/org/apache/tephra/InvalidTruncateTimeException.java
new file mode 100644
index 0000000..1cdbfb0
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/InvalidTruncateTimeException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra;
+
+/**
+ * Thrown when truncate invalid list is called with a time, and when there are in-progress transactions that
+ * were started before the given time.
+ */
+public class InvalidTruncateTimeException extends Exception {
+  public InvalidTruncateTimeException(String s) {
+    super(s);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/NoRetryStrategy.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/NoRetryStrategy.java b/tephra-core/src/main/java/org/apache/tephra/NoRetryStrategy.java
new file mode 100644
index 0000000..4dc245d
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/NoRetryStrategy.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra;
+
+/**
+ * Does no retries
+ */
+public class NoRetryStrategy implements RetryStrategy {
+  public static final RetryStrategy INSTANCE = new NoRetryStrategy();
+
+  private NoRetryStrategy() {}
+
+  @Override
+  public long nextRetry(TransactionFailureException reason, int failureCount) {
+    return -1;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/RetryOnConflictStrategy.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/RetryOnConflictStrategy.java b/tephra-core/src/main/java/org/apache/tephra/RetryOnConflictStrategy.java
new file mode 100644
index 0000000..82f069f
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/RetryOnConflictStrategy.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra;
+
+/**
+ * Retries transaction execution when transaction fails with {@link TransactionConflictException}.
+ */
+public class RetryOnConflictStrategy implements RetryStrategy {
+  private final int maxRetries;
+  private final long retryDelay;
+
+  public RetryOnConflictStrategy(int maxRetries, long retryDelay) {
+    this.maxRetries = maxRetries;
+    this.retryDelay = retryDelay;
+  }
+
+  @Override
+  public long nextRetry(TransactionFailureException reason, int failureCount) {
+    if (reason instanceof TransactionConflictException) {
+      return failureCount > maxRetries ? -1 : retryDelay;
+    } else {
+      return -1;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/RetryStrategies.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/RetryStrategies.java b/tephra-core/src/main/java/org/apache/tephra/RetryStrategies.java
new file mode 100644
index 0000000..9550d77
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/RetryStrategies.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra;
+
+/**
+ * Collection of {@link RetryStrategy}s.
+ */
+public final class RetryStrategies {
+  private RetryStrategies() {}
+
+  /**
+   * @param maxRetries max number of retries
+   * @param delayInMs delay between retries in milliseconds
+   * @return RetryStrategy that retries transaction execution when transaction fails with
+   *         {@link TransactionConflictException}
+   */
+  public static RetryStrategy retryOnConflict(int maxRetries, long delayInMs) {
+    return new RetryOnConflictStrategy(maxRetries, delayInMs);
+  }
+
+  public static RetryStrategy noRetries() {
+    return NoRetryStrategy.INSTANCE;
+  }
+}