You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by po...@apache.org on 2016/05/11 20:15:31 UTC

[31/56] [abbrv] [partial] incubator-tephra git commit: Rename package to org.apache.tephra

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/util/HBaseVersionSpecificFactory.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/util/HBaseVersionSpecificFactory.java b/tephra-core/src/main/java/org/apache/tephra/util/HBaseVersionSpecificFactory.java
new file mode 100644
index 0000000..ebbbd18
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/util/HBaseVersionSpecificFactory.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.util;
+
+import com.google.inject.Provider;
+import com.google.inject.ProvisionException;
+import org.apache.twill.internal.utils.Instances;
+
+/**
+ * Common class factory behavior for classes which need specific implementations depending on HBase versions.
+ * Specific factories can subclass this class and simply plug in the class names for their implementations.
+ *
+ * @param <T> Version specific class provided by this factory.
+ */
+public abstract class HBaseVersionSpecificFactory<T> implements Provider<T> {
+  @Override
+  public T get() {
+    T instance = null;
+    try {
+      switch (HBaseVersion.get()) {
+        case HBASE_94:
+          throw new ProvisionException("HBase 0.94 is no longer supported.  Please upgrade to HBase 0.96 or newer.");
+        case HBASE_96:
+          instance = createInstance(getHBase96Classname());
+          break;
+        case HBASE_98:
+          instance = createInstance(getHBase98Classname());
+          break;
+        case HBASE_10:
+          instance = createInstance(getHBase10Classname());
+          break;
+        case HBASE_10_CDH:
+          instance = createInstance(getHBase10CDHClassname());
+          break;
+        case HBASE_11:
+        case HBASE_12_CDH:
+          instance = createInstance(getHBase11Classname());
+          break;
+        case UNKNOWN:
+          throw new ProvisionException("Unknown HBase version: " + HBaseVersion.getVersionString());
+      }
+    } catch (ClassNotFoundException cnfe) {
+      throw new ProvisionException(cnfe.getMessage(), cnfe);
+    }
+    return instance;
+  }
+
+  protected T createInstance(String className) throws ClassNotFoundException {
+    Class clz = Class.forName(className);
+    return (T) Instances.newInstance(clz);
+  }
+
+  protected abstract String getHBase96Classname();
+  protected abstract String getHBase98Classname();
+  protected abstract String getHBase10Classname();
+  protected abstract String getHBase10CDHClassname();
+  protected abstract String getHBase11Classname();
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java b/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java
new file mode 100644
index 0000000..08b1545
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.util;
+
+import com.google.common.primitives.Longs;
+import org.apache.tephra.Transaction;
+import org.apache.tephra.TransactionManager;
+import org.apache.tephra.TransactionType;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.persist.TransactionVisibilityState;
+
+import java.util.Map;
+
+/**
+ * Utility methods supporting transaction operations.
+ */
+public class TxUtils {
+
+  // Any cell with timestamp less than MAX_NON_TX_TIMESTAMP is assumed to be pre-existing data,
+  // i.e. data written before table was converted into transactional table using Tephra.
+  // Using 1.1 times current time to determine whether a timestamp is transactional timestamp or not is safe, and does
+  // not produce any false positives or false negatives.
+  //
+  // To prove this, let's say the earliest transactional timestamp written by Tephra was in year 2000, and the oldest
+  // that will be written is in year 2200.
+  // 01-Jan-2000 GMT is  946684800000 milliseconds since epoch.
+  // 31-Dec-2200 GMT is 7289654399000 milliseconds since epoch.
+  //
+  // Let's say that we enabled transactions on a table on 01-Jan-2000, then 1.1 * 946684800000 = 31-Dec-2002. Using
+  // 31-Dec-2002, we can safely say from 01-Jan-2000 onwards, whether a cell timestamp is
+  // non-transactional (<= 946684800000).
+  // Note that transactional timestamp will be greater than 946684800000000000 (> year 31969) at this instant.
+  //
+  // On the other end, let's say we enabled transactions on a table on 31-Dec-2200,
+  // then 1.1 * 7289654399000 = 07-Feb-2224. Again, we can use this time from 31-Dec-2200 onwards to say whether a
+  // cell timestamp is transactional (<= 7289654399000).
+  // Note that transactional timestamp will be greater than 7289654399000000000 (> year 232969) at this instant.
+  private static final long MAX_NON_TX_TIMESTAMP = (long) (System.currentTimeMillis() * 1.1);
+
+  /**
+   * Returns the oldest visible timestamp for the given transaction, based on the TTLs configured for each column
+   * family.  If no TTL is set on any column family, the oldest visible timestamp will be {@code 0}.
+   * @param ttlByFamily A map of column family name to TTL value (in milliseconds)
+   * @param tx The current transaction
+   * @return The oldest timestamp that will be visible for the given transaction and TTL configuration
+   */
+  public static long getOldestVisibleTimestamp(Map<byte[], Long> ttlByFamily, Transaction tx) {
+    long maxTTL = getMaxTTL(ttlByFamily);
+    // we know that data will not be cleaned up while this tx is running up to this point as janitor uses it
+    return maxTTL < Long.MAX_VALUE ? tx.getVisibilityUpperBound() - maxTTL * TxConstants.MAX_TX_PER_MS : 0;
+  }
+
+  /**
+   * Returns the oldest visible timestamp for the given transaction, based on the TTLs configured for each column
+   * family.  If no TTL is set on any column family, the oldest visible timestamp will be {@code 0}.
+   * @param ttlByFamily A map of column family name to TTL value (in milliseconds)
+   * @param tx The current transaction
+   * @param readNonTxnData indicates that the timestamp returned should allow reading non-transactional data
+   * @return The oldest timestamp that will be visible for the given transaction and TTL configuration
+   */
+  public static long getOldestVisibleTimestamp(Map<byte[], Long> ttlByFamily, Transaction tx, boolean readNonTxnData) {
+    if (readNonTxnData) {
+      long maxTTL = getMaxTTL(ttlByFamily);
+      return maxTTL < Long.MAX_VALUE ? System.currentTimeMillis() - maxTTL : 0;
+    }
+
+    return getOldestVisibleTimestamp(ttlByFamily, tx);
+  }
+
+  /**
+   * Returns the maximum timestamp to use for time-range operations, based on the given transaction.
+   * @param tx The current transaction
+   * @return The maximum timestamp (exclusive) to use for time-range operations
+   */
+  public static long getMaxVisibleTimestamp(Transaction tx) {
+    // NOTE: +1 here because we want read up to writepointer inclusive, but timerange's end is exclusive
+    // however, we also need to guard against overflow in the case write pointer is set to MAX_VALUE
+    return tx.getWritePointer() < Long.MAX_VALUE ?
+        tx.getWritePointer() + 1 : tx.getWritePointer();
+  }
+
+  /**
+   * Creates a "dummy" transaction based on the given txVisibilityState's state.  This is not a "real" transaction in
+   * the sense that it has not been started, data should not be written with it, and it cannot be committed.  However,
+   * this can still be useful for filtering data according to the txVisibilityState's state.  Instead of the actual
+   * write pointer from the txVisibilityState, however, we use {@code Long.MAX_VALUE} to avoid mis-identifying any cells
+   * as being written by this transaction (and therefore visible).
+   */
+  public static Transaction createDummyTransaction(TransactionVisibilityState txVisibilityState) {
+    return new Transaction(txVisibilityState.getReadPointer(), Long.MAX_VALUE,
+                           Longs.toArray(txVisibilityState.getInvalid()),
+                           Longs.toArray(txVisibilityState.getInProgress().keySet()),
+                           TxUtils.getFirstShortInProgress(txVisibilityState.getInProgress()), TransactionType.SHORT);
+  }
+
+  /**
+   * Returns the write pointer for the first "short" transaction that in the in-progress set, or
+   * {@link Transaction#NO_TX_IN_PROGRESS} if none.
+   */
+  public static long getFirstShortInProgress(Map<Long, TransactionManager.InProgressTx> inProgress) {
+    long firstShort = Transaction.NO_TX_IN_PROGRESS;
+    for (Map.Entry<Long, TransactionManager.InProgressTx> entry : inProgress.entrySet()) {
+      if (!entry.getValue().isLongRunning()) {
+        firstShort = entry.getKey();
+        break;
+      }
+    }
+    return firstShort;
+  }
+
+  /**
+   * Returns the timestamp for calculating time to live for the given cell timestamp.
+   * This takes into account pre-existing non-transactional cells while calculating the time.
+   */
+  public static long getTimestampForTTL(long cellTs) {
+    return isPreExistingVersion(cellTs) ? cellTs * TxConstants.MAX_TX_PER_MS : cellTs;
+  }
+
+  /**
+   * Returns the max TTL for the given TTL values. Returns Long.MAX_VALUE if any of the column families has no TTL set.
+   */
+  private static long getMaxTTL(Map<byte[], Long> ttlByFamily) {
+    long maxTTL = 0;
+    for (Long familyTTL : ttlByFamily.values()) {
+      maxTTL = Math.max(familyTTL <= 0 ? Long.MAX_VALUE : familyTTL, maxTTL);
+    }
+    return maxTTL == 0 ? Long.MAX_VALUE : maxTTL;
+  }
+
+  /**
+   * Returns true if version was written before table was converted into transactional table, false otherwise.
+   */
+  public static boolean isPreExistingVersion(long version) {
+    return version < MAX_NON_TX_TIMESTAMP;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/visibility/DefaultFenceWait.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/visibility/DefaultFenceWait.java b/tephra-core/src/main/java/org/apache/tephra/visibility/DefaultFenceWait.java
new file mode 100644
index 0000000..7c1af8e
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/visibility/DefaultFenceWait.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.visibility;
+
+import com.google.common.base.Stopwatch;
+import org.apache.tephra.TransactionContext;
+import org.apache.tephra.TransactionFailureException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Default implementation of {@link FenceWait}.
+ */
+public class DefaultFenceWait implements FenceWait {
+  private static final Logger LOG = LoggerFactory.getLogger(DefaultFenceWait.class);
+
+  private final TransactionContext txContext;
+
+  DefaultFenceWait(TransactionContext txContext) {
+    this.txContext = txContext;
+  }
+
+  @Override
+  public void await(long timeout, TimeUnit timeUnit)
+    throws TransactionFailureException, InterruptedException, TimeoutException {
+    Stopwatch stopwatch = new Stopwatch();
+    stopwatch.start();
+    long sleepTimeMicros = timeUnit.toMicros(timeout) / 10;
+    // Have sleep time to be within 1 microsecond and 500 milliseconds
+    sleepTimeMicros = Math.max(Math.min(sleepTimeMicros, 500 * 1000), 1);
+    while (stopwatch.elapsedTime(timeUnit) < timeout) {
+      txContext.start();
+      try {
+        txContext.finish();
+        return;
+      } catch (TransactionFailureException e) {
+        LOG.error("Got exception waiting for fence. Sleeping for {} microseconds", sleepTimeMicros, e);
+        txContext.abort();
+        TimeUnit.MICROSECONDS.sleep(sleepTimeMicros);
+      }
+    }
+    throw new TimeoutException("Timeout waiting for fence");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/visibility/FenceWait.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/visibility/FenceWait.java b/tephra-core/src/main/java/org/apache/tephra/visibility/FenceWait.java
new file mode 100644
index 0000000..e1fb246
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/visibility/FenceWait.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.visibility;
+
+import org.apache.tephra.TransactionFailureException;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Used by a writer to wait on a fence so that changes are visible to all readers with in-progress transactions.
+ */
+public interface FenceWait {
+  /**
+   * Waits until the fence is complete, or till the timeout specified. The fence wait transaction will get re-tried
+   * several times until the timeout.
+   * <p>
+   *
+   * If a fence wait times out then it means there are still some readers with in-progress transactions that have not
+   * seen the change. In this case the wait will have to be retried using the same FenceWait object.
+   *
+   * @param timeout Maximum time to wait
+   * @param timeUnit {@link TimeUnit} for timeout and sleepTime
+   * @throws TransactionFailureException when not able to start fence wait transaction
+   * @throws InterruptedException on any interrupt
+   * @throws TimeoutException when timeout is reached
+   */
+  void await(long timeout, TimeUnit timeUnit)
+    throws TransactionFailureException, InterruptedException, TimeoutException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/visibility/ReadFence.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/visibility/ReadFence.java b/tephra-core/src/main/java/org/apache/tephra/visibility/ReadFence.java
new file mode 100644
index 0000000..a156d55
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/visibility/ReadFence.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.visibility;
+
+import com.google.common.primitives.Bytes;
+import com.google.common.primitives.Longs;
+import org.apache.tephra.Transaction;
+import org.apache.tephra.TransactionAware;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Implementation of {@link VisibilityFence} used by reader.
+ */
+class ReadFence implements TransactionAware {
+  private final byte[] fenceId;
+  private Transaction tx;
+
+  public ReadFence(byte[] fenceId) {
+    this.fenceId = fenceId;
+  }
+
+  @Override
+  public void startTx(Transaction tx) {
+    this.tx = tx;
+  }
+
+  @Override
+  public void updateTx(Transaction tx) {
+    // Fences only need original transaction
+  }
+
+  @Override
+  public Collection<byte[]> getTxChanges() {
+    if (tx == null) {
+      throw new IllegalStateException("Transaction has not started yet");
+    }
+    return Collections.singleton(Bytes.concat(fenceId, Longs.toByteArray(tx.getTransactionId())));
+  }
+
+  @Override
+  public boolean commitTx() throws Exception {
+    // Nothing to persist
+    return true;
+  }
+
+  @Override
+  public void postTxCommit() {
+    tx = null;
+  }
+
+  @Override
+  public boolean rollbackTx() throws Exception {
+    // Nothing to rollback
+    return true;
+  }
+
+  @Override
+  public String getTransactionAwareName() {
+    return getClass().getSimpleName();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/visibility/VisibilityFence.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/visibility/VisibilityFence.java b/tephra-core/src/main/java/org/apache/tephra/visibility/VisibilityFence.java
new file mode 100644
index 0000000..5d08246
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/visibility/VisibilityFence.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.visibility;
+
+import org.apache.tephra.TransactionAware;
+import org.apache.tephra.TransactionContext;
+import org.apache.tephra.TransactionFailureException;
+import org.apache.tephra.TransactionSystemClient;
+
+import java.util.concurrent.TimeoutException;
+
+/**
+ * VisibilityFence is used to ensure that after a given point in time, all readers see an updated change
+ * that got committed.
+ * <p>
+ *
+ * Typically a reader will never conflict with a writer, since a reader only sees committed changes when its
+ * transaction started. However to ensure that after a given point all readers are aware of a change,
+ * we have to introduce a conflict between a reader and a writer that act on the same data concurrently.
+ *<p>
+ *
+ * This is done by the reader indicating that it is interested in changes to a piece of data by using a fence
+ * in its transaction. If there are no changes to the data when reader tries to commit the transaction
+ * containing the fence, the commit succeeds.
+ * <p>
+ *
+ * On the other hand, a writer updates the same data in a transaction. After the write transaction is committed,
+ * the writer then waits on the fence to ensure that all in-progress readers are aware of this update.
+ * When the wait on the fence returns successfully, it means that
+ * any in-progress readers that have not seen the change will not be allowed to commit anymore. This will
+ * force the readers to start a new transaction, and this ensures that the changes made by writer are visible
+ * to the readers.
+ *<p>
+ *
+ * In case an in-progress reader commits when the writer is waiting on the fence, then the wait method will retry
+ * until the given timeout.
+ * <p>
+ *
+ * Hence a successful await on a fence ensures that any reader (using the same fence) that successfully commits after
+ * this point onwards would see the change.
+ *
+ * <p>
+ * Sample reader code:
+ * <pre>
+ *   <code>
+ * TransactionAware fence = VisibilityFence.create(fenceId);
+ * TransactionContext readTxContext = new TransactionContext(txClient, fence, table1, table2, ...);
+ * readTxContext.start();
+ *
+ * // do operations using table1, table2, etc.
+ *
+ * // finally commit
+ * try {
+ *   readTxContext.finish();
+ * } catch (TransactionConflictException e) {
+ *   // handle conflict by aborting and starting over with a new transaction
+ * }
+ *   </code>
+ * </pre>
+ *<p>
+ *
+ * Sample writer code:
+ * <pre>
+ *   <code>
+ * // start transaction
+ * // write change
+ * // commit transaction
+ *
+ * // Now wait on the fence (with the same fenceId as the readers) to ensure that all in-progress readers are
+ * aware of this change
+ * try {
+ *   FenceWait fenceWait = VisibilityFence.prepareWait(fenceId, txClient);
+ *   fenceWait.await(50000, 50, TimeUnit.MILLISECONDS);
+ * } catch (TimeoutException e) {
+ *   // await timed out, the change may not be visible to all in-progress readers.
+ *   // Application has two options at this point:
+ *   // 1. Revert the write. Re-try the write and fence wait again.
+ *   // 2. Retry only the wait with the same fenceWait object (retry logic is not shown here).
+ * }
+ *   </code>
+ * </pre>
+ *
+ * fenceId in the above samples refers to any id that both the readers and writer know for a given
+ * piece of data. Both readers and writer will have to use the same fenceId to synchronize on a given data.
+ * Typically fenceId uniquely identifies the data in question.
+ * For example, if the data is a table row, the fenceId can be composed of table name and row key.
+ * If the data is a table cell, the fenceId can be composed of table name, row key, and column key.
+ *<p>
+ *
+ * Note that in this implementation, any reader that starts a transaction after the write is committed, and
+ * while this read transaction is in-progress, if a writer successfully starts and completes an await on the fence then
+ * this reader will get a conflict while committing the fence even though this reader has seen the latest changes.
+ * This is because today there is no way to determine the commit time of a transaction.
+ */
+public final class VisibilityFence {
+  private VisibilityFence() {
+    // Cannot instantiate this class, all functionality is through static methods.
+  }
+
+  /**
+   * Used by a reader to get a fence that can be added to its transaction context.
+   *
+   * @param fenceId uniquely identifies the data that this fence is used to synchronize.
+   *                  If the data is a table cell then this id can be composed of the table name, row key
+   *                  and column key for the data.
+   * @return {@link TransactionAware} to be added to reader's transaction context.
+   */
+  public static TransactionAware create(byte[] fenceId) {
+    return new ReadFence(fenceId);
+  }
+
+  /**
+   * Used by a writer to wait on a fence so that changes are visible to all readers with in-progress transactions.
+   *
+   * @param fenceId uniquely identifies the data that this fence is used to synchronize.
+   *                  If the data is a table cell then this id can be composed of the table name, row key
+   *                  and column key for the data.
+   * @return {@link FenceWait} object
+   */
+  public static FenceWait prepareWait(byte[] fenceId, TransactionSystemClient txClient)
+    throws TransactionFailureException, InterruptedException, TimeoutException {
+    return new DefaultFenceWait(new TransactionContext(txClient, new WriteFence(fenceId)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/visibility/WriteFence.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/visibility/WriteFence.java b/tephra-core/src/main/java/org/apache/tephra/visibility/WriteFence.java
new file mode 100644
index 0000000..fe59efe
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/visibility/WriteFence.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.visibility;
+
+import com.google.common.primitives.Bytes;
+import com.google.common.primitives.Longs;
+import com.google.common.primitives.UnsignedBytes;
+import org.apache.tephra.Transaction;
+import org.apache.tephra.TransactionAware;
+
+import java.util.Collection;
+import java.util.TreeSet;
+
+/**
+ * Implementation used by {@link FenceWait} to wait for a {@link VisibilityFence}.
+ */
+class WriteFence implements TransactionAware {
+  private final byte[] fenceId;
+  private Transaction tx;
+  private Collection<byte[]> inProgressChanges;
+
+  public WriteFence(byte[] fenceId) {
+    this.fenceId = fenceId;
+  }
+
+  @Override
+  public void startTx(Transaction tx) {
+    this.tx = tx;
+    if (inProgressChanges == null) {
+      inProgressChanges = new TreeSet<>(UnsignedBytes.lexicographicalComparator());
+      for (long inProgressTx : tx.getInProgress()) {
+        inProgressChanges.add(Bytes.concat(fenceId, Longs.toByteArray(inProgressTx)));
+      }
+    }
+  }
+
+  @Override
+  public void updateTx(Transaction tx) {
+    // Fences only need original transaction
+  }
+
+  @Override
+  public Collection<byte[]> getTxChanges() {
+    if (inProgressChanges == null || tx == null) {
+      throw new IllegalStateException("Transaction has not started yet");
+    }
+    return inProgressChanges;
+  }
+
+  @Override
+  public boolean commitTx() throws Exception {
+    // Nothing to persist
+    return true;
+  }
+
+  @Override
+  public void postTxCommit() {
+    tx = null;
+  }
+
+  @Override
+  public boolean rollbackTx() throws Exception {
+    // Nothing to rollback
+    return true;
+  }
+
+  @Override
+  public String getTransactionAwareName() {
+    return getClass().getSimpleName();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/zookeeper/BasicACLData.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/zookeeper/BasicACLData.java b/tephra-core/src/main/java/org/apache/tephra/zookeeper/BasicACLData.java
new file mode 100644
index 0000000..8d95a42
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/zookeeper/BasicACLData.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.zookeeper;
+
+import org.apache.twill.zookeeper.ACLData;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+
+import java.util.List;
+
+/**
+ * A straightforward implementation of {@link ACLData}.
+ */
+final class BasicACLData implements ACLData {
+
+  private final List<ACL> acl;
+  private final Stat stat;
+
+  BasicACLData(List<ACL> acl, Stat stat) {
+    this.acl = acl;
+    this.stat = stat;
+  }
+
+  @Override
+  public List<ACL> getACL() {
+    return acl;
+  }
+
+  @Override
+  public Stat getStat() {
+    return stat;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/zookeeper/BasicNodeChildren.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/zookeeper/BasicNodeChildren.java b/tephra-core/src/main/java/org/apache/tephra/zookeeper/BasicNodeChildren.java
new file mode 100644
index 0000000..80edddb
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/zookeeper/BasicNodeChildren.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.zookeeper;
+
+import com.google.common.base.Objects;
+import org.apache.twill.zookeeper.NodeChildren;
+import org.apache.zookeeper.data.Stat;
+
+import java.util.List;
+
+/**
+ * Implementation of the {@link NodeChildren}.
+ */
+final class BasicNodeChildren implements NodeChildren {
+
+  private final Stat stat;
+  private final List<String> children;
+
+  BasicNodeChildren(List<String> children, Stat stat) {
+    this.stat = stat;
+    this.children = children;
+  }
+
+  @Override
+  public Stat getStat() {
+    return stat;
+  }
+
+  @Override
+  public List<String> getChildren() {
+    return children;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || !(o instanceof NodeChildren)) {
+      return false;
+    }
+
+    NodeChildren that = (NodeChildren) o;
+    return stat.equals(that.getStat()) && children.equals(that.getChildren());
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(children, stat);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/zookeeper/BasicNodeData.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/zookeeper/BasicNodeData.java b/tephra-core/src/main/java/org/apache/tephra/zookeeper/BasicNodeData.java
new file mode 100644
index 0000000..5df3475
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/zookeeper/BasicNodeData.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.zookeeper;
+
+import com.google.common.base.Objects;
+import org.apache.twill.zookeeper.NodeData;
+import org.apache.zookeeper.data.Stat;
+
+import java.util.Arrays;
+
+/**
+ * A straightforward implementation for {@link NodeData}.
+ */
+final class BasicNodeData implements NodeData {
+
+  private final byte[] data;
+  private final Stat stat;
+
+  BasicNodeData(byte[] data, Stat stat) {
+    this.data = data;
+    this.stat = stat;
+  }
+
+  @Override
+  public Stat getStat() {
+    return stat;
+  }
+
+  @Override
+  public byte[] getData() {
+    return data;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || !(o instanceof NodeData)) {
+      return false;
+    }
+
+    BasicNodeData that = (BasicNodeData) o;
+
+    return stat.equals(that.getStat()) && Arrays.equals(data, that.getData());
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(data, stat);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/zookeeper/TephraZKClientService.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/zookeeper/TephraZKClientService.java b/tephra-core/src/main/java/org/apache/tephra/zookeeper/TephraZKClientService.java
new file mode 100644
index 0000000..2bc7a6a
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/zookeeper/TephraZKClientService.java
@@ -0,0 +1,626 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.zookeeper;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.AbstractService;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import org.apache.twill.common.Cancellable;
+import org.apache.twill.common.Threads;
+import org.apache.twill.internal.zookeeper.SettableOperationFuture;
+import org.apache.twill.zookeeper.ACLData;
+import org.apache.twill.zookeeper.NodeChildren;
+import org.apache.twill.zookeeper.NodeData;
+import org.apache.twill.zookeeper.OperationFuture;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nullable;
+
+/**
+ * The implementation of {@link ZKClientService}.
+ */
+public class TephraZKClientService extends AbstractService implements ZKClientService, Watcher {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TephraZKClientService.class);
+
+  private final String zkStr;
+  private final int sessionTimeout;
+  private final List<Watcher> connectionWatchers;
+  private final Multimap<String, byte[]> authInfos;
+  private final AtomicReference<ZooKeeper> zooKeeper;
+  private final Runnable stopTask;
+  private ExecutorService eventExecutor;
+
+  /**
+   * Create a new instance.
+   * @param zkStr zookeper connection string
+   * @param sessionTimeout timeout in milliseconds
+   * @param connectionWatcher watcher to set
+   * @param authInfos authorization bytes
+   */
+  public TephraZKClientService(String zkStr, int sessionTimeout,
+                               Watcher connectionWatcher, Multimap<String, byte[]> authInfos) {
+    this.zkStr = zkStr;
+    this.sessionTimeout = sessionTimeout;
+    this.connectionWatchers = new CopyOnWriteArrayList<>();
+    this.authInfos = copyAuthInfo(authInfos);
+    addConnectionWatcher(connectionWatcher);
+
+    this.zooKeeper = new AtomicReference<>();
+    this.stopTask = createStopTask();
+  }
+
+  @Override
+  public Long getSessionId() {
+    ZooKeeper zk = zooKeeper.get();
+    return zk == null ? null : zk.getSessionId();
+  }
+
+  @Override
+  public String getConnectString() {
+    return zkStr;
+  }
+
+  @Override
+  public Cancellable addConnectionWatcher(final Watcher watcher) {
+    if (watcher == null) {
+      return new Cancellable() {
+        @Override
+        public void cancel() {
+          // No-op
+        }
+      };
+    }
+
+    // Invocation of connection watchers are already done inside the event thread,
+    // hence no need to wrap the watcher again.
+    connectionWatchers.add(watcher);
+    return new Cancellable() {
+      @Override
+      public void cancel() {
+        connectionWatchers.remove(watcher);
+      }
+    };
+  }
+
+  @Override
+  public OperationFuture<String> create(String path, @Nullable byte[] data, CreateMode createMode) {
+    return create(path, data, createMode, true);
+  }
+
+  @Override
+  public OperationFuture<String> create(String path, @Nullable byte[] data,
+                                        CreateMode createMode, boolean createParent) {
+    return create(path, data, createMode, createParent, ZooDefs.Ids.OPEN_ACL_UNSAFE);
+  }
+
+  @Override
+  public OperationFuture<String> create(String path, @Nullable byte[] data,
+                                        CreateMode createMode, Iterable<ACL> acl) {
+    return create(path, data, createMode, true, acl);
+  }
+
+  @Override
+  public OperationFuture<Stat> exists(String path) {
+    return exists(path, null);
+  }
+
+  @Override
+  public OperationFuture<NodeChildren> getChildren(String path) {
+    return getChildren(path, null);
+  }
+
+  @Override
+  public OperationFuture<NodeData> getData(String path) {
+    return getData(path, null);
+  }
+
+  @Override
+  public OperationFuture<Stat> setData(String path, byte[] data) {
+    return setData(path, data, -1);
+  }
+
+  @Override
+  public OperationFuture<String> delete(String path) {
+    return delete(path, -1);
+  }
+
+  @Override
+  public OperationFuture<Stat> setACL(String path, Iterable<ACL> acl) {
+    return setACL(path, acl, -1);
+  }
+
+  @Override
+  public OperationFuture<String> create(String path, @Nullable byte[] data, CreateMode createMode,
+                                        boolean createParent, Iterable<ACL> acl) {
+    return doCreate(path, data, createMode, createParent, ImmutableList.copyOf(acl), false);
+  }
+
+  private OperationFuture<String> doCreate(final String path,
+                                           @Nullable final byte[] data,
+                                           final CreateMode createMode,
+                                           final boolean createParent,
+                                           final List<ACL> acl,
+                                           final boolean ignoreNodeExists) {
+    final SettableOperationFuture<String> createFuture = SettableOperationFuture.create(path, eventExecutor);
+    getZooKeeper().create(path, data, acl, createMode, Callbacks.STRING, createFuture);
+    if (!createParent) {
+      return createFuture;
+    }
+
+    // If create parent is request, return a different future
+    final SettableOperationFuture<String> result = SettableOperationFuture.create(path, eventExecutor);
+    // Watch for changes in the original future
+    Futures.addCallback(createFuture, new FutureCallback<String>() {
+      @Override
+      public void onSuccess(String path) {
+        // Propagate if creation was successful
+        result.set(path);
+      }
+
+      @Override
+      public void onFailure(Throwable t) {
+        // See if the failure can be handled
+        if (updateFailureResult(t, result, path, ignoreNodeExists)) {
+          return;
+        }
+        // Create the parent node
+        String parentPath = getParent(path);
+        if (parentPath.isEmpty()) {
+          result.setException(t);
+          return;
+        }
+        // Watch for parent creation complete. Parent is created with the unsafe ACL.
+        Futures.addCallback(doCreate(parentPath, null, CreateMode.PERSISTENT,
+                                     true, ZooDefs.Ids.OPEN_ACL_UNSAFE, true), new FutureCallback<String>() {
+          @Override
+          public void onSuccess(String parentPath) {
+            // Create the requested path again
+            Futures.addCallback(
+              doCreate(path, data, createMode, false, acl, ignoreNodeExists), new FutureCallback<String>() {
+                @Override
+                public void onSuccess(String pathResult) {
+                  result.set(pathResult);
+                }
+
+                @Override
+                public void onFailure(Throwable t) {
+                  // handle the failure
+                  updateFailureResult(t, result, path, ignoreNodeExists);
+                }
+              });
+          }
+
+          @Override
+          public void onFailure(Throwable t) {
+            result.setException(t);
+          }
+        });
+      }
+
+      /**
+       * Updates the result future based on the given {@link Throwable}.
+       * @param t Cause of the failure
+       * @param result Future to be updated
+       * @param path Request path for the operation
+       * @return {@code true} if it is a failure, {@code false} otherwise.
+       */
+      private boolean updateFailureResult(Throwable t, SettableOperationFuture<String> result,
+                                          String path, boolean ignoreNodeExists) {
+        // Propagate if there is error
+        if (!(t instanceof KeeperException)) {
+          result.setException(t);
+          return true;
+        }
+        KeeperException.Code code = ((KeeperException) t).code();
+        // Node already exists, simply return success if it allows for ignoring node exists (for parent node creation).
+        if (ignoreNodeExists && code == KeeperException.Code.NODEEXISTS) {
+          // The requested path could be used because it only applies to non-sequential node
+          result.set(path);
+          return false;
+        }
+        if (code != KeeperException.Code.NONODE) {
+          result.setException(t);
+          return true;
+        }
+        return false;
+      }
+
+      /**
+       * Gets the parent of the given path.
+       * @param path Path for computing its parent
+       * @return Parent of the given path, or empty string if the given path is the root path already.
+       */
+      private String getParent(String path) {
+        String parentPath = path.substring(0, path.lastIndexOf('/'));
+        return (parentPath.isEmpty() && !"/".equals(path)) ? "/" : parentPath;
+      }
+    });
+
+    return result;
+  }
+
+  @Override
+  public OperationFuture<Stat> exists(String path, Watcher watcher) {
+    SettableOperationFuture<Stat> result = SettableOperationFuture.create(path, eventExecutor);
+    getZooKeeper().exists(path, wrapWatcher(watcher), Callbacks.STAT_NONODE, result);
+    return result;
+  }
+
+  @Override
+  public OperationFuture<NodeChildren> getChildren(String path, Watcher watcher) {
+    SettableOperationFuture<NodeChildren> result = SettableOperationFuture.create(path, eventExecutor);
+    getZooKeeper().getChildren(path, wrapWatcher(watcher), Callbacks.CHILDREN, result);
+    return result;
+  }
+
+  @Override
+  public OperationFuture<NodeData> getData(String path, Watcher watcher) {
+    SettableOperationFuture<NodeData> result = SettableOperationFuture.create(path, eventExecutor);
+    getZooKeeper().getData(path, wrapWatcher(watcher), Callbacks.DATA, result);
+
+    return result;
+  }
+
+  @Override
+  public OperationFuture<Stat> setData(String dataPath, byte[] data, int version) {
+    SettableOperationFuture<Stat> result = SettableOperationFuture.create(dataPath, eventExecutor);
+    getZooKeeper().setData(dataPath, data, version, Callbacks.STAT, result);
+    return result;
+  }
+
+  @Override
+  public OperationFuture<String> delete(String deletePath, int version) {
+    SettableOperationFuture<String> result = SettableOperationFuture.create(deletePath, eventExecutor);
+    getZooKeeper().delete(deletePath, version, Callbacks.VOID, result);
+    return result;
+  }
+
+  @Override
+  public OperationFuture<ACLData> getACL(String path) {
+    SettableOperationFuture<ACLData> result = SettableOperationFuture.create(path, eventExecutor);
+    getZooKeeper().getACL(path, new Stat(), Callbacks.ACL, result);
+    return result;
+  }
+
+  @Override
+  public OperationFuture<Stat> setACL(String path, Iterable<ACL> acl, int version) {
+    SettableOperationFuture<Stat> result = SettableOperationFuture.create(path, eventExecutor);
+    getZooKeeper().setACL(path, ImmutableList.copyOf(acl), version, Callbacks.STAT, result);
+    return result;
+  }
+
+  @Override
+  public Supplier<ZooKeeper> getZooKeeperSupplier() {
+    return new Supplier<ZooKeeper>() {
+      @Override
+      public ZooKeeper get() {
+        return getZooKeeper();
+      }
+    };
+  }
+
+  @Override
+  protected void doStart() {
+    // A single thread executor for all events
+    ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
+                                                         new LinkedBlockingQueue<Runnable>(),
+                                                         Threads.createDaemonThreadFactory("zk-client-EventThread"));
+    // Just discard the execution if the executor is closed
+    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
+    eventExecutor = executor;
+
+    try {
+      zooKeeper.set(createZooKeeper());
+    } catch (IOException e) {
+      notifyFailed(e);
+    }
+  }
+
+  @Override
+  protected void doStop() {
+    // Submit a task to the executor to make sure all pending events in the executor are fired before
+    // transiting this Service into STOPPED state
+    eventExecutor.submit(stopTask);
+    eventExecutor.shutdown();
+  }
+
+  /**
+   * @return Current {@link ZooKeeper} client.
+   */
+  private ZooKeeper getZooKeeper() {
+    ZooKeeper zk = zooKeeper.get();
+    Preconditions.checkArgument(zk != null, "Not connected to zooKeeper.");
+    return zk;
+  }
+
+  /**
+   * Wraps the given watcher to be called from the event executor.
+   * @param watcher Watcher to be wrapped
+   * @return The wrapped Watcher
+   */
+  private Watcher wrapWatcher(final Watcher watcher) {
+    if (watcher == null) {
+      return null;
+    }
+    return new Watcher() {
+      @Override
+      public void process(final WatchedEvent event) {
+        if (eventExecutor.isShutdown()) {
+          LOG.debug("Already shutdown. Discarding event: {}", event);
+          return;
+        }
+        eventExecutor.execute(new Runnable() {
+          @Override
+          public void run() {
+            try {
+              watcher.process(event);
+            } catch (Throwable t) {
+              LOG.error("Watcher throws exception.", t);
+            }
+          }
+        });
+      }
+    };
+  }
+
+  /**
+   * Creates a deep copy of the given authInfos multimap.
+   */
+  private Multimap<String, byte[]> copyAuthInfo(Multimap<String, byte[]> authInfos) {
+    Multimap<String, byte[]> result = ArrayListMultimap.create();
+
+    for (Map.Entry<String, byte[]> entry : authInfos.entries()) {
+      byte[] info = entry.getValue();
+      result.put(entry.getKey(), info == null ? null : Arrays.copyOf(info, info.length));
+    }
+
+    return result;
+  }
+
+  @Override
+  public void process(WatchedEvent event) {
+    State state = state();
+    if (state == State.TERMINATED || state == State.FAILED) {
+      return;
+    }
+
+    try {
+      if (event.getState() == Event.KeeperState.SyncConnected && state == State.STARTING) {
+        LOG.debug("Connected to ZooKeeper: {}", zkStr);
+        notifyStarted();
+        return;
+      }
+      if (event.getState() == Event.KeeperState.Expired) {
+        LOG.info("ZooKeeper session expired: {}", zkStr);
+
+        // When connection expired, simply reconnect again
+        if (state != State.RUNNING) {
+          return;
+        }
+        eventExecutor.submit(new Runnable() {
+          @Override
+          public void run() {
+            // Only reconnect if the current state is running
+            if (state() != State.RUNNING) {
+              return;
+            }
+            try {
+              LOG.info("Reconnect to ZooKeeper due to expiration: {}", zkStr);
+              closeZooKeeper(zooKeeper.getAndSet(createZooKeeper()));
+            } catch (IOException e) {
+              notifyFailed(e);
+            }
+          }
+        });
+      }
+    } finally {
+      if (event.getType() == Event.EventType.None) {
+        for (Watcher connectionWatcher : connectionWatchers) {
+          connectionWatcher.process(event);
+        }
+      }
+    }
+  }
+
+  /**
+   * Creates a new ZooKeeper connection.
+   */
+  private ZooKeeper createZooKeeper() throws IOException {
+    ZooKeeper zk = new ZooKeeper(zkStr, sessionTimeout, wrapWatcher(this));
+    for (Map.Entry<String, byte[]> authInfo : authInfos.entries()) {
+      zk.addAuthInfo(authInfo.getKey(), authInfo.getValue());
+    }
+    return zk;
+  }
+
+  /**
+   * Closes the given {@link ZooKeeper} if it is not null. If there is InterruptedException,
+   * it will get logged.
+   */
+  private void closeZooKeeper(@Nullable ZooKeeper zk) {
+    try {
+      if (zk != null) {
+        zk.close();
+      }
+    } catch (InterruptedException e) {
+      LOG.warn("Interrupted when closing ZooKeeper", e);
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  /**
+   * Creates a {@link Runnable} task that will get executed in the event executor for transiting this
+   * Service into STOPPED state.
+   */
+  private Runnable createStopTask() {
+    return new Runnable() {
+      @Override
+      public void run() {
+        try {
+          // Close the ZK connection in this task will make sure if there is ZK connection created
+          // after doStop() was called but before this task has been executed is also closed.
+          // It is possible to happen when the following sequence happens:
+          //
+          // 1. session expired, hence the expired event is triggered
+          // 2. The reconnect task executed. With Service.state() == RUNNING, it creates a new ZK client
+          // 3. Service.stop() gets called, Service.state() changed to STOPPING
+          // 4. The new ZK client created from the reconnect thread update the zooKeeper with the new one
+          closeZooKeeper(zooKeeper.getAndSet(null));
+          notifyStopped();
+        } catch (Exception e) {
+          notifyFailed(e);
+        }
+      }
+    };
+  }
+
+  /**
+   * Collection of generic callbacks that simply reflect results into OperationFuture.
+   */
+  private static final class Callbacks {
+    static final AsyncCallback.StringCallback STRING = new AsyncCallback.StringCallback() {
+      @Override
+      @SuppressWarnings("unchecked")
+      public void processResult(int rc, String path, Object ctx, String name) {
+        SettableOperationFuture<String> result = (SettableOperationFuture<String>) ctx;
+        KeeperException.Code code = KeeperException.Code.get(rc);
+        if (code == KeeperException.Code.OK) {
+          result.set((name == null || name.isEmpty()) ? path : name);
+          return;
+        }
+        result.setException(KeeperException.create(code, result.getRequestPath()));
+      }
+    };
+
+    static final AsyncCallback.StatCallback STAT = new AsyncCallback.StatCallback() {
+      @Override
+      @SuppressWarnings("unchecked")
+      public void processResult(int rc, String path, Object ctx, Stat stat) {
+        SettableOperationFuture<Stat> result = (SettableOperationFuture<Stat>) ctx;
+        KeeperException.Code code = KeeperException.Code.get(rc);
+        if (code == KeeperException.Code.OK) {
+          result.set(stat);
+          return;
+        }
+        result.setException(KeeperException.create(code, result.getRequestPath()));
+      }
+    };
+
+    /**
+     * A stat callback that treats NONODE as success.
+     */
+    static final AsyncCallback.StatCallback STAT_NONODE = new AsyncCallback.StatCallback() {
+      @Override
+      @SuppressWarnings("unchecked")
+      public void processResult(int rc, String path, Object ctx, Stat stat) {
+        SettableOperationFuture<Stat> result = (SettableOperationFuture<Stat>) ctx;
+        KeeperException.Code code = KeeperException.Code.get(rc);
+        if (code == KeeperException.Code.OK || code == KeeperException.Code.NONODE) {
+          result.set(stat);
+          return;
+        }
+        result.setException(KeeperException.create(code, result.getRequestPath()));
+      }
+    };
+
+    static final AsyncCallback.Children2Callback CHILDREN = new AsyncCallback.Children2Callback() {
+      @Override
+      @SuppressWarnings("unchecked")
+      public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
+        SettableOperationFuture<NodeChildren> result = (SettableOperationFuture<NodeChildren>) ctx;
+        KeeperException.Code code = KeeperException.Code.get(rc);
+        if (code == KeeperException.Code.OK) {
+          result.set(new BasicNodeChildren(children, stat));
+          return;
+        }
+        result.setException(KeeperException.create(code, result.getRequestPath()));
+      }
+    };
+
+    static final AsyncCallback.DataCallback DATA = new AsyncCallback.DataCallback() {
+      @Override
+      @SuppressWarnings("unchecked")
+      public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
+        SettableOperationFuture<NodeData> result = (SettableOperationFuture<NodeData>) ctx;
+        KeeperException.Code code = KeeperException.Code.get(rc);
+        if (code == KeeperException.Code.OK) {
+          result.set(new BasicNodeData(data, stat));
+          return;
+        }
+        result.setException(KeeperException.create(code, result.getRequestPath()));
+      }
+    };
+
+    static final AsyncCallback.VoidCallback VOID = new AsyncCallback.VoidCallback() {
+      @Override
+      @SuppressWarnings("unchecked")
+      public void processResult(int rc, String path, Object ctx) {
+        SettableOperationFuture<String> result = (SettableOperationFuture<String>) ctx;
+        KeeperException.Code code = KeeperException.Code.get(rc);
+        if (code == KeeperException.Code.OK) {
+          result.set(result.getRequestPath());
+          return;
+        }
+        // Otherwise, it is an error
+        result.setException(KeeperException.create(code, result.getRequestPath()));
+      }
+    };
+
+    static final AsyncCallback.ACLCallback ACL = new AsyncCallback.ACLCallback() {
+      @Override
+      @SuppressWarnings("unchecked")
+      public void processResult(int rc, String path, Object ctx, List<ACL> acl, Stat stat) {
+        SettableOperationFuture<ACLData> result = (SettableOperationFuture<ACLData>) ctx;
+        KeeperException.Code code = KeeperException.Code.get(rc);
+        if (code == KeeperException.Code.OK) {
+          result.set(new BasicACLData(acl, stat));
+          return;
+        }
+        result.setException(KeeperException.create(code, result.getRequestPath()));
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/thrift/transaction.thrift
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/thrift/transaction.thrift b/tephra-core/src/main/thrift/transaction.thrift
index 39e6cec..0e3d712 100644
--- a/tephra-core/src/main/thrift/transaction.thrift
+++ b/tephra-core/src/main/thrift/transaction.thrift
@@ -16,7 +16,7 @@
 # limitations under the License.
 #
 
-namespace java co.cask.tephra.distributed.thrift
+namespace java org.apache.tephra.distributed.thrift
 
 enum TTransactionType {
   SHORT = 1,

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/ThriftTransactionSystemTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/co/cask/tephra/ThriftTransactionSystemTest.java b/tephra-core/src/test/java/co/cask/tephra/ThriftTransactionSystemTest.java
deleted file mode 100644
index cf01d25..0000000
--- a/tephra-core/src/test/java/co/cask/tephra/ThriftTransactionSystemTest.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra;
-
-import co.cask.tephra.distributed.TransactionService;
-import co.cask.tephra.persist.InMemoryTransactionStateStorage;
-import co.cask.tephra.persist.TransactionStateStorage;
-import co.cask.tephra.runtime.ConfigModule;
-import co.cask.tephra.runtime.DiscoveryModules;
-import co.cask.tephra.runtime.TransactionClientModule;
-import co.cask.tephra.runtime.TransactionModules;
-import co.cask.tephra.runtime.ZKModule;
-import com.google.inject.AbstractModule;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-import com.google.inject.Scopes;
-import com.google.inject.util.Modules;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.twill.internal.zookeeper.InMemoryZKServer;
-import org.apache.twill.zookeeper.ZKClientService;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.rules.TemporaryFolder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ThriftTransactionSystemTest extends TransactionSystemTest {
-  private static final Logger LOG = LoggerFactory.getLogger(ThriftTransactionSystemTest.class);
-  
-  private static InMemoryZKServer zkServer;
-  private static ZKClientService zkClientService;
-  private static TransactionService txService;
-  private static TransactionStateStorage storage;
-  private static TransactionSystemClient txClient;
-
-  @ClassRule
-  public static TemporaryFolder tmpFolder = new TemporaryFolder();
-  
-  @BeforeClass
-  public static void start() throws Exception {
-    zkServer = InMemoryZKServer.builder().setDataDir(tmpFolder.newFolder()).build();
-    zkServer.startAndWait();
-
-    Configuration conf = new Configuration();
-    conf.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false);
-    conf.set(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM, zkServer.getConnectionStr());
-    conf.set(TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, "n-times");
-    conf.setInt(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, 1);
-
-    Injector injector = Guice.createInjector(
-      new ConfigModule(conf),
-      new ZKModule(),
-      new DiscoveryModules().getDistributedModules(),
-      Modules.override(new TransactionModules().getDistributedModules())
-        .with(new AbstractModule() {
-          @Override
-          protected void configure() {
-            bind(TransactionStateStorage.class).to(InMemoryTransactionStateStorage.class).in(Scopes.SINGLETON);
-          }
-        }),
-      new TransactionClientModule()
-    );
-
-    zkClientService = injector.getInstance(ZKClientService.class);
-    zkClientService.startAndWait();
-
-    // start a tx server
-    txService = injector.getInstance(TransactionService.class);
-    storage = injector.getInstance(TransactionStateStorage.class);
-    txClient = injector.getInstance(TransactionSystemClient.class);
-    try {
-      LOG.info("Starting transaction service");
-      txService.startAndWait();
-    } catch (Exception e) {
-      LOG.error("Failed to start service: ", e);
-    }
-  }
-  
-  @Before
-  public void reset() throws Exception {
-    getClient().resetState();
-  }
-  
-  @AfterClass
-  public static void stop() throws Exception {
-    txService.stopAndWait();
-    storage.stopAndWait();
-    zkClientService.stopAndWait();
-    zkServer.stopAndWait();
-  }
-  
-  @Override
-  protected TransactionSystemClient getClient() throws Exception {
-    return txClient;
-  }
-
-  @Override
-  protected TransactionStateStorage getStateStorage() throws Exception {
-    return storage;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/TransactionAdminTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/co/cask/tephra/TransactionAdminTest.java b/tephra-core/src/test/java/co/cask/tephra/TransactionAdminTest.java
deleted file mode 100644
index de02f80..0000000
--- a/tephra-core/src/test/java/co/cask/tephra/TransactionAdminTest.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra;
-
-import co.cask.tephra.distributed.TransactionService;
-import co.cask.tephra.persist.InMemoryTransactionStateStorage;
-import co.cask.tephra.persist.TransactionStateStorage;
-import co.cask.tephra.runtime.ConfigModule;
-import co.cask.tephra.runtime.DiscoveryModules;
-import co.cask.tephra.runtime.TransactionClientModule;
-import co.cask.tephra.runtime.TransactionModules;
-import co.cask.tephra.runtime.ZKModule;
-import com.google.inject.AbstractModule;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-import com.google.inject.Scopes;
-import com.google.inject.util.Modules;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.twill.internal.zookeeper.InMemoryZKServer;
-import org.apache.twill.zookeeper.ZKClientService;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.ByteArrayOutputStream;
-import java.io.PrintStream;
-import java.util.concurrent.TimeUnit;
-
-public class TransactionAdminTest {
-  private static final Logger LOG = LoggerFactory.getLogger(TransactionAdminTest.class);
-  
-  private static Configuration conf;
-  private static InMemoryZKServer zkServer;
-  private static ZKClientService zkClientService;
-  private static TransactionService txService;
-  private static TransactionSystemClient txClient;
-
-  @ClassRule
-  public static TemporaryFolder tmpFolder = new TemporaryFolder();
-
-  @BeforeClass
-  public static void start() throws Exception {
-    zkServer = InMemoryZKServer.builder().setDataDir(tmpFolder.newFolder()).build();
-    zkServer.startAndWait();
-
-    conf = new Configuration();
-    conf.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false);
-    conf.set(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM, zkServer.getConnectionStr());
-    conf.set(TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, "n-times");
-    conf.setInt(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, 1);
-
-    Injector injector = Guice.createInjector(
-      new ConfigModule(conf),
-      new ZKModule(),
-      new DiscoveryModules().getDistributedModules(),
-      Modules.override(new TransactionModules().getDistributedModules())
-        .with(new AbstractModule() {
-          @Override
-          protected void configure() {
-            bind(TransactionStateStorage.class).to(InMemoryTransactionStateStorage.class).in(Scopes.SINGLETON);
-          }
-        }),
-      new TransactionClientModule()
-    );
-
-    zkClientService = injector.getInstance(ZKClientService.class);
-    zkClientService.startAndWait();
-
-    // start a tx server
-    txService = injector.getInstance(TransactionService.class);
-    txClient = injector.getInstance(TransactionSystemClient.class);
-    try {
-      LOG.info("Starting transaction service");
-      txService.startAndWait();
-    } catch (Exception e) {
-      LOG.error("Failed to start service: ", e);
-    }
-  }
-
-  @Before
-  public void reset() throws Exception {
-    txClient.resetState();
-  }
-
-  @AfterClass
-  public static void stop() throws Exception {
-    txService.stopAndWait();
-    zkClientService.stopAndWait();
-    zkServer.stopAndWait();
-  }
-
-  @Test
-  public void testPrintUsage() throws Exception {
-    ByteArrayOutputStream out = new ByteArrayOutputStream();
-    ByteArrayOutputStream err = new ByteArrayOutputStream();
-    TransactionAdmin txAdmin = new TransactionAdmin(new PrintStream(out), new PrintStream(err));
-    int status = txAdmin.doMain(new String[0], conf);
-    Assert.assertEquals(1, status);
-    //noinspection ConstantConditions
-    Assert.assertTrue(err.toString("UTF-8").startsWith("Usage:"));
-    Assert.assertEquals(0, out.toByteArray().length);
-  }
-  
-  @Test
-  public void testTruncateInvalidTx() throws Exception {
-    Transaction tx1 = txClient.startLong();
-    Transaction tx2 = txClient.startShort();
-    txClient.invalidate(tx1.getTransactionId());
-    txClient.invalidate(tx2.getTransactionId());
-    Assert.assertEquals(2, txClient.getInvalidSize());
-
-    TransactionAdmin txAdmin = new TransactionAdmin(new PrintStream(System.out), new PrintStream(System.err));
-    int status = txAdmin.doMain(new String[]{"--truncate-invalid-tx", String.valueOf(tx2.getTransactionId())}, conf);
-    Assert.assertEquals(0, status);
-    Assert.assertEquals(1, txClient.getInvalidSize());
-  }
-
-  @Test
-  public void testTruncateInvalidTxBefore() throws Exception {
-    Transaction tx1 = txClient.startLong();
-    TimeUnit.MILLISECONDS.sleep(1);
-    long beforeTx2 = System.currentTimeMillis();
-    Transaction tx2 = txClient.startLong();
-
-    // Try before invalidation
-    Assert.assertEquals(0, txClient.getInvalidSize());
-    TransactionAdmin txAdmin = new TransactionAdmin(new PrintStream(System.out), new PrintStream(System.err));
-    int status = txAdmin.doMain(new String[]{"--truncate-invalid-tx-before", String.valueOf(beforeTx2)}, conf);
-    // Assert command failed due to in-progress transactions
-    Assert.assertEquals(1, status);
-    // Assert no change to invalid size
-    Assert.assertEquals(0, txClient.getInvalidSize());
-
-    txClient.invalidate(tx1.getTransactionId());
-    txClient.invalidate(tx2.getTransactionId());
-    Assert.assertEquals(2, txClient.getInvalidSize());
-
-    status = txAdmin.doMain(new String[]{"--truncate-invalid-tx-before", String.valueOf(beforeTx2)}, conf);
-    Assert.assertEquals(0, status);
-    Assert.assertEquals(1, txClient.getInvalidSize());
-  }
-
-  @Test
-  public void testGetInvalidTxSize() throws Exception {
-    Transaction tx1 = txClient.startShort();
-    txClient.startLong();
-    txClient.invalidate(tx1.getTransactionId());
-
-    ByteArrayOutputStream out = new ByteArrayOutputStream();
-    ByteArrayOutputStream err = new ByteArrayOutputStream();
-    TransactionAdmin txAdmin = new TransactionAdmin(new PrintStream(out), new PrintStream(err));
-    int status = txAdmin.doMain(new String[]{"--get-invalid-tx-size"}, conf);
-    Assert.assertEquals(0, status);
-    //noinspection ConstantConditions
-    Assert.assertTrue(out.toString("UTF-8").contains("Invalid list size: 1\n"));
-  }
-}