You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by po...@apache.org on 2016/05/06 23:02:56 UTC

[50/51] [partial] incubator-tephra git commit: Rename package to org.apache.tephra

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/AbstractTransactionAwareTable.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/AbstractTransactionAwareTable.java b/tephra-core/src/main/java/co/cask/tephra/AbstractTransactionAwareTable.java
deleted file mode 100644
index 11a5b55..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/AbstractTransactionAwareTable.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra;
-
-import com.google.common.base.Charsets;
-import com.google.common.base.Objects;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.primitives.Bytes;
-import com.google.common.primitives.UnsignedBytes;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-
-/**
- * Base class for all the common parts of the HBase version-specific {@code TransactionAwareHTable}
- * implementations.
- */
-public abstract class AbstractTransactionAwareTable implements TransactionAware {
-  protected final TransactionCodec txCodec;
-  // map of write pointers to change set associated with each
-  protected final Map<Long, Set<ActionChange>> changeSets;
-  protected final TxConstants.ConflictDetection conflictLevel;
-  protected Transaction tx;
-  protected boolean allowNonTransactional;
-
-  public AbstractTransactionAwareTable(TxConstants.ConflictDetection conflictLevel, boolean allowNonTransactional) {
-    this.conflictLevel = conflictLevel;
-    this.allowNonTransactional = allowNonTransactional;
-    this.txCodec = new TransactionCodec();
-    this.changeSets = Maps.newHashMap();
-  }
-
-  /**
-   * True if the instance allows non-transaction operations.
-   * @return
-   */
-  public boolean getAllowNonTransactional() {
-    return this.allowNonTransactional;
-  }
-
-  /**
-   * Set whether the instance allows non-transactional operations.
-   * @param allowNonTransactional
-   */
-  public void setAllowNonTransactional(boolean allowNonTransactional) {
-    this.allowNonTransactional = allowNonTransactional;
-  }
-
-  @Override
-  public void startTx(Transaction tx) {
-    this.tx = tx;
-  }
-
-  @Override
-  public void updateTx(Transaction tx) {
-    this.tx = tx;
-  }
-
-  @Override
-  public Collection<byte[]> getTxChanges() {
-    if (conflictLevel == TxConstants.ConflictDetection.NONE) {
-      return Collections.emptyList();
-    }
-
-    Collection<byte[]> txChanges = new TreeSet<byte[]>(UnsignedBytes.lexicographicalComparator());
-    for (Set<ActionChange> changeSet : changeSets.values()) {
-      for (ActionChange change : changeSet) {
-        txChanges.add(getChangeKey(change.getRow(), change.getFamily(), change.getQualifier()));
-      }
-    }
-    return txChanges;
-  }
-
-  public byte[] getChangeKey(byte[] row, byte[] family, byte[] qualifier) {
-    byte[] key;
-    switch (conflictLevel) {
-      case ROW:
-        key = Bytes.concat(getTableKey(), row);
-        break;
-      case COLUMN:
-        key = Bytes.concat(getTableKey(), row, family, qualifier);
-        break;
-      case NONE:
-        throw new IllegalStateException("NONE conflict detection does not support change keys");
-      default:
-        throw new IllegalStateException("Unknown conflict detection level: " + conflictLevel);
-    }
-    return key;
-  }
-
-  @Override
-  public boolean commitTx() throws Exception {
-    return doCommit();
-  }
-
-  /**
-   * Commits any pending writes by flushing the wrapped {@code HTable} instance.
-   */
-  protected abstract boolean doCommit() throws IOException;
-
-  @Override
-  public void postTxCommit() {
-    tx = null;
-    changeSets.clear();
-  }
-
-  @Override
-  public String getTransactionAwareName() {
-    return new String(getTableKey(), Charsets.UTF_8);
-  }
-
-  /**
-   * Returns the table name to use as a key prefix for the transaction change set.
-   */
-  protected abstract byte[] getTableKey();
-
-  @Override
-  public boolean rollbackTx() throws Exception {
-    return doRollback();
-  }
-
-  /**
-   * Rolls back any persisted changes from the transaction by issuing offsetting deletes to the
-   * wrapped {@code HTable} instance.  How this is handled will depend on the delete API exposed
-   * by the specific version of HBase.
-   */
-  protected abstract boolean doRollback() throws Exception;
-
-  protected void addToChangeSet(byte[] row, byte[] family, byte[] qualifier) {
-    long currentWritePointer = tx.getWritePointer();
-    Set<ActionChange> changeSet = changeSets.get(currentWritePointer);
-    if (changeSet == null) {
-      changeSet = Sets.newHashSet();
-      changeSets.put(currentWritePointer, changeSet);
-    }
-    switch (conflictLevel) {
-      case ROW:
-      case NONE:
-        // with ROW or NONE conflict detection, we still need to track changes per-family, since this
-        // is the granularity at which we will issue deletes for rollback
-        changeSet.add(new ActionChange(row, family));
-        break;
-      case COLUMN:
-        changeSet.add(new ActionChange(row, family, qualifier));
-        break;
-      default:
-        throw new IllegalStateException("Unknown conflict detection level: " + conflictLevel);
-    }
-  }
-
-  /**
-   * Record of each transaction that causes a change. This reference is used to rollback
-   * any operation upon failure.
-   */
-  protected class ActionChange {
-    private final byte[] row;
-    private final byte[] family;
-    private final byte[] qualifier;
-
-    public ActionChange(byte[] row, byte[] family) {
-      this(row, family, null);
-    }
-
-    public ActionChange(byte[] row, byte[] family, byte[] qualifier) {
-      this.row = row;
-      this.family = family;
-      this.qualifier = qualifier;
-    }
-
-    public byte[] getRow() {
-      return row;
-    }
-
-    public byte[] getFamily() {
-      return family;
-    }
-
-    public byte[] getQualifier() {
-      return qualifier;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (o == null || o.getClass() != this.getClass()) {
-        return false;
-      }
-
-      if (o == this) {
-        return true;
-      }
-
-      ActionChange other = (ActionChange) o;
-      return Objects.equal(this.row, other.row) &&
-             Objects.equal(this.family, other.family) &&
-             Objects.equal(this.qualifier, other.qualifier);
-    }
-
-    @Override
-    public int hashCode() {
-      int result = Arrays.hashCode(row);
-      result = 31 * result + (family != null ? Arrays.hashCode(family) : 0);
-      result = 31 * result + (qualifier != null ? Arrays.hashCode(qualifier) : 0);
-      return result;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/AbstractTransactionExecutor.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/AbstractTransactionExecutor.java b/tephra-core/src/main/java/co/cask/tephra/AbstractTransactionExecutor.java
deleted file mode 100644
index f2a28a8..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/AbstractTransactionExecutor.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra;
-
-import com.google.common.base.Throwables;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-
-/**
- * Provides implementation of asynchronous methods of {@link TransactionExecutor} by delegating their execution
- * to respective synchronous methods via provided {@link ExecutorService}.
- */
-public abstract class AbstractTransactionExecutor implements TransactionExecutor {
-  private final ListeningExecutorService executorService;
-
-  protected AbstractTransactionExecutor(ExecutorService executorService) {
-    this.executorService = MoreExecutors.listeningDecorator(executorService);
-  }
-
-  @Override
-  public <I, O> O executeUnchecked(Function<I, O> function, I input) {
-    try {
-      return execute(function, input);
-    } catch (TransactionFailureException e) {
-      throw Throwables.propagate(e);
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw Throwables.propagate(e);
-    }
-  }
-
-  @Override
-  public <I> void executeUnchecked(Procedure<I> procedure, I input) {
-    try {
-      execute(procedure, input);
-    } catch (TransactionFailureException e) {
-      throw Throwables.propagate(e);
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw Throwables.propagate(e);
-    }
-  }
-
-  @Override
-  public <O> O executeUnchecked(Callable<O> callable) {
-    try {
-      return execute(callable);
-    } catch (TransactionFailureException e) {
-      throw Throwables.propagate(e);
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw Throwables.propagate(e);
-    }
-  }
-
-  @Override
-  public void executeUnchecked(Subroutine subroutine) {
-    try {
-      execute(subroutine);
-    } catch (TransactionFailureException e) {
-      throw Throwables.propagate(e);
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw Throwables.propagate(e);
-    }
-  }
-
-  @Override
-  public <I, O> ListenableFuture<O> submit(final Function<I, O> function, final I input) {
-    return executorService.submit(new Callable<O>() {
-      @Override
-      public O call() throws Exception {
-        return execute(function, input);
-      }
-    });
-  }
-
-  @Override
-  public <I> ListenableFuture<?> submit(final Procedure<I> procedure, final I input) {
-    return executorService.submit(new Callable<Object>() {
-      @Override
-      public I call() throws Exception {
-        execute(procedure, input);
-        return null;
-      }
-    });
-  }
-
-  @Override
-  public <O> ListenableFuture<O> submit(final Callable<O> callable) {
-    return executorService.submit(new Callable<O>() {
-      @Override
-      public O call() throws Exception {
-        return execute(callable);
-      }
-    });
-  }
-
-  @Override
-  public ListenableFuture<?> submit(final Subroutine subroutine) {
-    return executorService.submit(new Callable<Object>() {
-      @Override
-      public Object call() throws Exception {
-        execute(subroutine);
-        return null;
-      }
-    });
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/ChangeId.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/ChangeId.java b/tephra-core/src/main/java/co/cask/tephra/ChangeId.java
deleted file mode 100644
index 8413256..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/ChangeId.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra;
-
-import java.util.Arrays;
-
-/**
- * Represents a row key from a data set changed as part of a transaction.
- */
-public final class ChangeId {
-  private final byte[] key;
-  private final int hash;
-
-  public ChangeId(byte[] bytes) {
-    key = bytes;
-    hash = Arrays.hashCode(bytes);
-  }
-
-  public byte[] getKey() {
-    return key;
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (o == this) {
-      return true;
-    }
-    if (o == null || o.getClass() != ChangeId.class) {
-      return false;
-    }
-    ChangeId other = (ChangeId) o;
-    return hash == other.hash && Arrays.equals(key, other.key);
-  }
-
-  @Override
-  public int hashCode() {
-    return hash;
-  }
-
-  @Override
-  public String toString() {
-    return toStringBinary(key, 0, key.length);
-  }
-
-  // Copy from Bytes.toStringBinary so that we don't need direct dependencies on Bytes.
-  private String toStringBinary(byte [] b, int off, int len) {
-    StringBuilder result = new StringBuilder();
-    for (int i = off; i < off + len; ++i) {
-      int ch = b[i] & 0xFF;
-      if ((ch >= '0' && ch <= '9')
-       || (ch >= 'A' && ch <= 'Z')
-       || (ch >= 'a' && ch <= 'z')
-       || " `~!@#$%^&*()-_=+[]{}|;:'\",.<>/?".indexOf(ch) >= 0) {
-        result.append((char) ch);
-      } else {
-        result.append(String.format("\\x%02X", ch));
-      }
-    }
-    return result.toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/DefaultTransactionExecutor.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/DefaultTransactionExecutor.java b/tephra-core/src/main/java/co/cask/tephra/DefaultTransactionExecutor.java
deleted file mode 100644
index f85dd6b..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/DefaultTransactionExecutor.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Utility class that encapsulates the transaction life cycle over a given set of
- * transaction-aware datasets. The executor can be reused across multiple invocations
- * of the execute() method. However, it is not thread-safe for concurrent execution.
- * <p>
- *   Transaction execution will be retries according to specified in constructor {@link RetryStrategy}.
- *   By default {@link RetryOnConflictStrategy} is used with max 20 retries and 100 ms between retries.
- * </p>
- */
-public class DefaultTransactionExecutor extends AbstractTransactionExecutor {
-
-  private final Collection<TransactionAware> txAwares;
-  private final TransactionSystemClient txClient;
-  private final RetryStrategy retryStrategy;
-
-  /**
-   * Convenience constructor, has same affect as {@link #DefaultTransactionExecutor(TransactionSystemClient, Iterable)}
-   */
-  public DefaultTransactionExecutor(TransactionSystemClient txClient, TransactionAware... txAwares) {
-    this(txClient, Arrays.asList(txAwares));
-  }
-
-
-  public DefaultTransactionExecutor(TransactionSystemClient txClient,
-                                    Iterable<TransactionAware> txAwares,
-                                    RetryStrategy retryStrategy) {
-
-    super(MoreExecutors.sameThreadExecutor());
-    this.txAwares = ImmutableList.copyOf(txAwares);
-    this.txClient = txClient;
-    this.retryStrategy = retryStrategy;
-  }
-
-  /**
-   * Constructor for a transaction executor.
-   */
-  @Inject
-  public DefaultTransactionExecutor(TransactionSystemClient txClient, @Assisted Iterable<TransactionAware> txAwares) {
-    this(txClient, txAwares, RetryStrategies.retryOnConflict(20, 100));
-  }
-
-  @Override
-  public <I, O> O execute(Function<I, O> function, I input) throws TransactionFailureException, InterruptedException {
-    return executeWithRetry(function, input);
-  }
-
-  @Override
-  public <I> void execute(final Procedure<I> procedure, I input)
-    throws TransactionFailureException, InterruptedException {
-
-    execute(new Function<I, Void>() {
-      @Override
-      public Void apply(I input) throws Exception {
-        procedure.apply(input);
-        return null;
-      }
-    }, input);
-  }
-
-  @Override
-  public <O> O execute(final Callable<O> callable) throws TransactionFailureException, InterruptedException {
-    return execute(new Function<Void, O>() {
-      @Override
-      public O apply(Void input) throws Exception {
-        return callable.call();
-      }
-    }, null);
-  }
-
-  @Override
-  public void execute(final Subroutine subroutine) throws TransactionFailureException, InterruptedException {
-    execute(new Function<Void, Void>() {
-      @Override
-      public Void apply(Void input) throws Exception {
-        subroutine.apply();
-        return null;
-      }
-    }, null);
-  }
-
-  private <I, O> O executeWithRetry(Function<I, O> function, I input)
-    throws TransactionFailureException, InterruptedException {
-
-    int retries = 0;
-    while (true) {
-      try {
-        return executeOnce(function, input);
-      } catch (TransactionFailureException e) {
-        long delay = retryStrategy.nextRetry(e, ++retries);
-
-        if (delay < 0) {
-          throw e;
-        }
-
-        if (delay > 0) {
-          TimeUnit.MILLISECONDS.sleep(delay);
-        }
-      }
-    }
-
-  }
-
-  private <I, O> O executeOnce(Function<I, O> function, I input) throws TransactionFailureException {
-    TransactionContext txContext = new TransactionContext(txClient, txAwares);
-    txContext.start();
-    O o = null;
-    try {
-      o = function.apply(input);
-    } catch (Throwable e) {
-      txContext.abort(new TransactionFailureException("Transaction function failure for transaction. ", e));
-      // abort will throw
-    }
-    // will throw if smth goes wrong
-    txContext.finish();
-    return o;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/InvalidTruncateTimeException.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/InvalidTruncateTimeException.java b/tephra-core/src/main/java/co/cask/tephra/InvalidTruncateTimeException.java
deleted file mode 100644
index b4576ee..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/InvalidTruncateTimeException.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra;
-
-/**
- * Thrown when truncate invalid list is called with a time, and when there are in-progress transactions that
- * were started before the given time.
- */
-public class InvalidTruncateTimeException extends Exception {
-  public InvalidTruncateTimeException(String s) {
-    super(s);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/NoRetryStrategy.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/NoRetryStrategy.java b/tephra-core/src/main/java/co/cask/tephra/NoRetryStrategy.java
deleted file mode 100644
index 30e9ceb..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/NoRetryStrategy.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra;
-
-/**
- * Does no retries
- */
-public class NoRetryStrategy implements RetryStrategy {
-  public static final RetryStrategy INSTANCE = new NoRetryStrategy();
-
-  private NoRetryStrategy() {}
-
-  @Override
-  public long nextRetry(TransactionFailureException reason, int failureCount) {
-    return -1;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/RetryOnConflictStrategy.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/RetryOnConflictStrategy.java b/tephra-core/src/main/java/co/cask/tephra/RetryOnConflictStrategy.java
deleted file mode 100644
index 04b9f75..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/RetryOnConflictStrategy.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra;
-
-/**
- * Retries transaction execution when transaction fails with {@link TransactionConflictException}.
- */
-public class RetryOnConflictStrategy implements RetryStrategy {
-  private final int maxRetries;
-  private final long retryDelay;
-
-  public RetryOnConflictStrategy(int maxRetries, long retryDelay) {
-    this.maxRetries = maxRetries;
-    this.retryDelay = retryDelay;
-  }
-
-  @Override
-  public long nextRetry(TransactionFailureException reason, int failureCount) {
-    if (reason instanceof TransactionConflictException) {
-      return failureCount > maxRetries ? -1 : retryDelay;
-    } else {
-      return -1;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/RetryStrategies.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/RetryStrategies.java b/tephra-core/src/main/java/co/cask/tephra/RetryStrategies.java
deleted file mode 100644
index d1de0e5..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/RetryStrategies.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra;
-
-/**
- * Collection of {@link RetryStrategy}s.
- */
-public final class RetryStrategies {
-  private RetryStrategies() {}
-
-  /**
-   * @param maxRetries max number of retries
-   * @param delayInMs delay between retries in milliseconds
-   * @return RetryStrategy that retries transaction execution when transaction fails with
-   *         {@link TransactionConflictException}
-   */
-  public static RetryStrategy retryOnConflict(int maxRetries, long delayInMs) {
-    return new RetryOnConflictStrategy(maxRetries, delayInMs);
-  }
-
-  public static RetryStrategy noRetries() {
-    return NoRetryStrategy.INSTANCE;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/RetryStrategy.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/RetryStrategy.java b/tephra-core/src/main/java/co/cask/tephra/RetryStrategy.java
deleted file mode 100644
index e651d0e..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/RetryStrategy.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra;
-
-/**
- * Retry strategy for failed transactions
- */
-public interface RetryStrategy {
-  /**
-   * Returns the number of milliseconds to wait before retrying the operation.
-   *
-   * @param reason Reason for transaction failure.
-   * @param failureCount Number of times that the request has been failed.
-   * @return Number of milliseconds to wait before retrying the operation. Returning {@code 0} means
-   *         retry it immediately, while negative means abort the operation.
-   */
-  long nextRetry(TransactionFailureException reason, int failureCount);
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/TransactionAdmin.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/TransactionAdmin.java b/tephra-core/src/main/java/co/cask/tephra/TransactionAdmin.java
deleted file mode 100644
index 25ccbfb..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/TransactionAdmin.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra;
-
-import co.cask.tephra.runtime.ConfigModule;
-import co.cask.tephra.runtime.DiscoveryModules;
-import co.cask.tephra.runtime.TransactionClientModule;
-import co.cask.tephra.runtime.TransactionModules;
-import co.cask.tephra.runtime.ZKModule;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.twill.zookeeper.ZKClientService;
-
-import java.io.PrintStream;
-import java.util.List;
-import java.util.Set;
-
-/**
- * Allows calling some methods on {@link TransactionManager} from command line.
- */
-public class TransactionAdmin {
-  private static final String OPT_TRUNCATE_INVALID_TX = "--truncate-invalid-tx";
-  private static final String OPT_TRUNCATE_INVALID_TX_BEFORE = "--truncate-invalid-tx-before";
-  private static final String OPT_GET_INVALID_TX_SIZE = "--get-invalid-tx-size";
-  
-  private final PrintStream out;
-  private final PrintStream err;
-
-  public static void main(String[] args) {
-    TransactionAdmin txAdmin = new TransactionAdmin(System.out, System.err);
-    int status = txAdmin.doMain(args, new Configuration());
-    System.exit(status);
-  }
-
-  public TransactionAdmin(PrintStream out, PrintStream err) {
-    this.out = out;
-    this.err = err;
-  }
-
-  @VisibleForTesting
-  int doMain(String[] args, Configuration conf) {
-    if (args.length < 1) {
-      printUsage();
-      return 1;
-    }
-
-    Injector injector = Guice.createInjector(
-      new ConfigModule(conf),
-      new ZKModule(),
-      new DiscoveryModules().getDistributedModules(),
-      new TransactionModules().getDistributedModules(),
-      new TransactionClientModule()
-    );
-
-    ZKClientService zkClient = injector.getInstance(ZKClientService.class);
-    zkClient.startAndWait();
-    
-    try {
-      TransactionSystemClient txClient = injector.getInstance(TransactionSystemClient.class);
-      String option = args[0];
-      
-      if (option.equals(OPT_TRUNCATE_INVALID_TX)) {
-        if (args.length != 2) {
-          printUsage();
-          return 1;
-        }
-        Set<Long> txIds;
-        try {
-          txIds = parseTxIds(args[1]);
-        } catch (NumberFormatException e) {
-          err.println("NumberFormatException: " + e.getMessage());
-          return 1;
-        }
-        if (!txIds.isEmpty()) {
-          out.println("Invalid list size before truncation: " + txClient.getInvalidSize());
-          txClient.truncateInvalidTx(txIds);
-          out.println("Invalid list size after truncation: " + txClient.getInvalidSize());
-        }
-      } else if (option.equals(OPT_TRUNCATE_INVALID_TX_BEFORE)) {
-        if (args.length != 2) {
-          printUsage();
-          return 1;
-        }
-        try {
-          long time = Long.parseLong(args[1]);
-          out.println("Invalid list size before truncation: " + txClient.getInvalidSize());
-          txClient.truncateInvalidTxBefore(time);
-          out.println("Invalid list size after truncation: " + txClient.getInvalidSize());
-        } catch (InvalidTruncateTimeException e) {
-          err.println(e.getMessage());
-          return 1;
-        } catch (NumberFormatException e) {
-          err.println("NumberFormatException: " + e.getMessage());
-          return 1;
-        }
-      } else if (option.equals(OPT_GET_INVALID_TX_SIZE)) {
-        if (args.length != 1) {
-          printUsage();
-          return 1;
-        }
-        out.println("Invalid list size: " + txClient.getInvalidSize());
-      } else {
-        printUsage();
-        return 1;
-      }
-    } finally {
-      zkClient.stopAndWait();
-    }
-    return 0;
-  }
-  
-  private Set<Long> parseTxIds(String option) throws NumberFormatException {
-    Set<Long> txIds = Sets.newHashSet();
-    for (String str : Splitter.on(',').split(option)) {
-      txIds.add(Long.parseLong(str));
-    }
-    return txIds;
-  }
-  
-  private void printUsage() {
-    String programName = TransactionAdmin.class.getSimpleName();
-    String spaces = "     ";
-    List<String> options = Lists.newArrayList();
-    options.add(join("Usage: "));
-    options.add(join(spaces, programName, OPT_TRUNCATE_INVALID_TX, "<tx1,tx2,...>"));
-    options.add(join(spaces, programName, OPT_TRUNCATE_INVALID_TX_BEFORE, "<time in secs>"));
-    options.add(join(spaces, programName, OPT_GET_INVALID_TX_SIZE));
-    
-    String usage = Joiner.on(System.getProperty("line.separator")).join(options);
-    err.println(usage);
-  }
-  
-  private static String join(String... args) {
-    return Joiner.on(" ").join(args);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/TransactionCodec.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/TransactionCodec.java b/tephra-core/src/main/java/co/cask/tephra/TransactionCodec.java
deleted file mode 100644
index 403b1ec..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/TransactionCodec.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra;
-
-import co.cask.tephra.distributed.TransactionConverterUtils;
-import co.cask.tephra.distributed.thrift.TTransaction;
-import org.apache.thrift.TDeserializer;
-import org.apache.thrift.TException;
-import org.apache.thrift.TSerializer;
-
-import java.io.IOException;
-
-/**
- * Handles serialization and deserialization of {@link co.cask.tephra.Transaction} instances to and from {@code byte[]}.
- */
-public class TransactionCodec {
-
-  public TransactionCodec() {
-  }
-
-  public byte[] encode(Transaction tx) throws IOException {
-    TTransaction thriftTx = TransactionConverterUtils.wrap(tx);
-    TSerializer serializer = new TSerializer();
-    try {
-      return serializer.serialize(thriftTx);
-    } catch (TException te) {
-      throw new IOException(te);
-    }
-  }
-
-  public Transaction decode(byte[] encoded) throws IOException {
-    TTransaction thriftTx = new TTransaction();
-    TDeserializer deserializer = new TDeserializer();
-    try {
-      deserializer.deserialize(thriftTx, encoded);
-      return TransactionConverterUtils.unwrap(thriftTx);
-    } catch (TException te) {
-      throw new IOException(te);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/TransactionContext.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/TransactionContext.java b/tephra-core/src/main/java/co/cask/tephra/TransactionContext.java
deleted file mode 100644
index 5b31ebe..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/TransactionContext.java
+++ /dev/null
@@ -1,323 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collection;
-import javax.annotation.Nullable;
-
-/**
- * Utility class that encapsulates the transaction life cycle over a given set of
- * transaction-aware datasets. It is not thread-safe for concurrent execution.
- */
-public class TransactionContext {
-
-  private static final Logger LOG = LoggerFactory.getLogger(TransactionContext.class);
-
-  private final Collection<TransactionAware> txAwares;
-  private final TransactionSystemClient txClient;
-
-  private Transaction currentTx;
-
-  public TransactionContext(TransactionSystemClient txClient, TransactionAware... txAwares) {
-    this(txClient, ImmutableList.copyOf(txAwares));
-  }
-
-  public TransactionContext(TransactionSystemClient txClient, Iterable<TransactionAware> txAwares) {
-    // Use a set to avoid adding the same TransactionAware twice and to make removal faster.
-    // Use a linked hash set so that insertion order is preserved (same behavior as when it was using a List).
-    this.txAwares = Sets.newLinkedHashSet(txAwares);
-    this.txClient = txClient;
-  }
-
-  /**
-   * Adds a new transaction-aware to participate in the transaction.
-   * @param txAware the new transaction-aware
-   */
-  public boolean addTransactionAware(TransactionAware txAware) {
-    // If the txAware is newly added, call startTx as well if there is an active transaction
-    boolean added = txAwares.add(txAware);
-    if (added && currentTx != null) {
-      txAware.startTx(currentTx);
-    }
-    return added;
-  }
-
-  /**
-   * Removes a {@link TransactionAware} and withdraws from participation in the transaction.
-   * Withdrawal is only allowed if there is no active transaction.
-   *
-   * @param txAware the {@link TransactionAware} to be removed
-   * @return true if the given {@link TransactionAware} is removed; false otherwise.
-   * @throws IllegalStateException if there is an active transaction going on with this TransactionContext.
-   */
-  public boolean removeTransactionAware(TransactionAware txAware) {
-    Preconditions.checkState(currentTx == null, "Cannot remove TransactionAware while there is an active transaction.");
-    return txAwares.remove(txAware);
-  }
-
-  /**
-   * Starts a new transaction.  Calling this will initiate a new transaction using the {@link TransactionSystemClient},
-   * and pass the returned transaction to {@link TransactionAware#startTx(Transaction)} for each registered
-   * TransactionAware.  If an exception is encountered, the transaction will be aborted and a
-   * {@code TransactionFailureException} wrapping the root cause will be thrown.
-   *
-   * @throws TransactionFailureException if an exception occurs starting the transaction with any registered
-   *     TransactionAware
-   */
-  public void start() throws TransactionFailureException {
-    currentTx = txClient.startShort();
-    for (TransactionAware txAware : txAwares) {
-      try {
-        txAware.startTx(currentTx);
-      } catch (Throwable e) {
-        String message = String.format("Unable to start transaction-aware '%s' for transaction %d. ",
-                                       txAware.getTransactionAwareName(), currentTx.getTransactionId());
-        LOG.warn(message, e);
-        txClient.abort(currentTx);
-        throw new TransactionFailureException(message, e);
-      }
-    }
-  }
-
-  /**
-   * Commits the current transaction.  This will: check for any conflicts, based on the change set aggregated from
-   * all registered {@link TransactionAware} instances; flush any pending writes from the {@code TransactionAware}s;
-   * commit the current transaction with the {@link TransactionSystemClient}; and clear the current transaction state.
-   *
-   * @throws TransactionConflictException if a conflict is detected with a recently committed transaction
-   * @throws TransactionFailureException if an error occurs while committing
-   */
-  public void finish() throws TransactionFailureException {
-    Preconditions.checkState(currentTx != null, "Cannot finish tx that has not been started");
-    // each of these steps will abort and rollback the tx in case if errors, and throw an exception
-    checkForConflicts();
-    persist();
-    commit();
-    postCommit();
-    currentTx = null;
-  }
-
-  /**
-   * Aborts the given transaction, and rolls back all data set changes. If rollback fails,
-   * the transaction is invalidated. If an exception is caught during rollback, the exception
-   * is rethrown wrapped in a TransactionFailureException, after all remaining TransactionAwares have
-   * completed rollback.
-   *
-   * @throws TransactionFailureException for any exception that is encountered.
-   */
-  public void abort() throws TransactionFailureException {
-    abort(null);
-  }
-
-  /**
-   * Checkpoints the current transaction by flushing any pending writes for the registered {@link TransactionAware}
-   * instances, and obtaining a new current write pointer for the transaction.  By performing a checkpoint,
-   * the client can ensure that all previous writes were flushed and are visible.  By default, the current write
-   * pointer for the transaction is also visible.  The current write pointer can be excluded from read
-   * operations by calling {@link Transaction#setVisibility(Transaction.VisibilityLevel)} with the visibility level set
-   * to {@link Transaction.VisibilityLevel#SNAPSHOT_EXCLUDE_CURRENT} on the {@link Transaction} instance created
-   * by the checkpoint call, which can be retrieved by calling {@link #getCurrentTransaction()}.
-   *
-   * After the checkpoint operation is performed, the updated
-   * {@link Transaction} instance will be passed to {@link TransactionAware#startTx(Transaction)} for each
-   * registered {@code TransactionAware} instance.
-   *
-   * @throws TransactionFailureException if an error occurs while performing the checkpoint
-   */
-  public void checkpoint() throws TransactionFailureException {
-    Preconditions.checkState(currentTx != null, "Cannot checkpoint tx that has not been started");
-    persist();
-    try {
-      currentTx = txClient.checkpoint(currentTx);
-      // update the current transaction with all TransactionAwares
-      for (TransactionAware txAware : txAwares) {
-        txAware.updateTx(currentTx);
-      }
-    } catch (TransactionNotInProgressException e) {
-      String message = String.format("Transaction %d is not in progress.", currentTx.getTransactionId());
-      LOG.warn(message, e);
-      abort(new TransactionFailureException(message, e));
-      // abort will throw that exception
-    } catch (Throwable e) {
-      String message = String.format("Exception from checkpoint for transaction %d.", currentTx.getTransactionId());
-      LOG.warn(message, e);
-      abort(new TransactionFailureException(message, e));
-      // abort will throw that exception
-    }
-  }
-
-  /**
-   * Returns the current transaction or null if no transaction is currently in progress.
-   */
-  @Nullable
-  public Transaction getCurrentTransaction() {
-    return currentTx;
-  }
-
-  // CHECKSTYLE IGNORE "@throws" FOR 11 LINES
-  /**
-   * Aborts the given transaction, and rolls back all data set changes. If rollback fails,
-   * the transaction is invalidated. If an exception is caught during rollback, the exception
-   * is rethrown wrapped into a TransactionFailureException, after all remaining TransactionAwares have
-   * completed rollback. If an existing exception is passed in, that exception is thrown in either
-   * case, whether the rollback is successful or not. In other words, this method always throws the
-   * first exception that it encounters.
-   * @param cause the original exception that caused the abort
-   * @throws TransactionFailureException for any exception that is encountered.
-   */
-  public void abort(TransactionFailureException cause) throws TransactionFailureException {
-    if (currentTx == null) {
-      // might be called by some generic exception handler even though already aborted/finished - we allow that
-      return;
-    }
-    try {
-      boolean success = true;
-      for (TransactionAware txAware : txAwares) {
-        try {
-          if (!txAware.rollbackTx()) {
-            success = false;
-          }
-        } catch (Throwable e) {
-          String message = String.format("Unable to roll back changes in transaction-aware '%s' for transaction %d. ",
-                                         txAware.getTransactionAwareName(), currentTx.getTransactionId());
-          LOG.warn(message, e);
-          if (cause == null) {
-            cause = new TransactionFailureException(message, e);
-          }
-          success = false;
-        }
-      }
-      if (success) {
-        txClient.abort(currentTx);
-      } else {
-        txClient.invalidate(currentTx.getTransactionId());
-      }
-      if (cause != null) {
-        throw cause;
-      }
-    } finally {
-      currentTx = null;
-    }
-  }
-
-  private void checkForConflicts() throws TransactionFailureException {
-    Collection<byte[]> changes = Lists.newArrayList();
-    for (TransactionAware txAware : txAwares) {
-      try {
-        changes.addAll(txAware.getTxChanges());
-      } catch (Throwable e) {
-        String message = String.format("Unable to retrieve changes from transaction-aware '%s' for transaction %d. ",
-                                       txAware.getTransactionAwareName(), currentTx.getTransactionId());
-        LOG.warn(message, e);
-        abort(new TransactionFailureException(message, e));
-        // abort will throw that exception
-      }
-    }
-
-    boolean canCommit = false;
-    try {
-      canCommit = txClient.canCommit(currentTx, changes);
-    } catch (TransactionNotInProgressException e) {
-      String message = String.format("Transaction %d is not in progress.", currentTx.getTransactionId());
-      LOG.warn(message, e);
-      abort(new TransactionFailureException(message, e));
-      // abort will throw that exception
-    } catch (Throwable e) {
-      String message = String.format("Exception from canCommit for transaction %d.", currentTx.getTransactionId());
-      LOG.warn(message, e);
-      abort(new TransactionFailureException(message, e));
-      // abort will throw that exception
-    }
-    if (!canCommit) {
-      String message = String.format("Conflict detected for transaction %d.", currentTx.getTransactionId());
-      abort(new TransactionConflictException(message));
-      // abort will throw
-    }
-  }
-
-  private void persist() throws TransactionFailureException {
-    for (TransactionAware txAware : txAwares) {
-      boolean success;
-      Throwable cause = null;
-      try {
-        success = txAware.commitTx();
-      } catch (Throwable e) {
-        success = false;
-        cause = e;
-      }
-      if (!success) {
-        String message = String.format("Unable to persist changes of transaction-aware '%s' for transaction %d. ",
-                                       txAware.getTransactionAwareName(), currentTx.getTransactionId());
-        if (cause == null) {
-          LOG.warn(message);
-        } else {
-          LOG.warn(message, cause);
-        }
-        abort(new TransactionFailureException(message, cause));
-        // abort will throw that exception
-      }
-    }
-  }
-
-  private void commit() throws TransactionFailureException {
-    boolean commitSuccess = false;
-    try {
-      commitSuccess = txClient.commit(currentTx);
-    } catch (TransactionNotInProgressException e) {
-      String message = String.format("Transaction %d is not in progress.", currentTx.getTransactionId());
-      LOG.warn(message, e);
-      abort(new TransactionFailureException(message, e));
-      // abort will throw that exception
-    } catch (Throwable e) {
-      String message = String.format("Exception from commit for transaction %d.", currentTx.getTransactionId());
-      LOG.warn(message, e);
-      abort(new TransactionFailureException(message, e));
-      // abort will throw that exception
-    }
-    if (!commitSuccess) {
-      String message = String.format("Conflict detected for transaction %d.", currentTx.getTransactionId());
-      abort(new TransactionConflictException(message));
-      // abort will throw
-    }
-  }
-
-  private void postCommit() throws TransactionFailureException {
-    TransactionFailureException cause = null;
-    for (TransactionAware txAware : txAwares) {
-      try {
-        txAware.postTxCommit();
-      } catch (Throwable e) {
-        String message = String.format("Unable to perform post-commit in transaction-aware '%s' for transaction %d. ",
-                                       txAware.getTransactionAwareName(), currentTx.getTransactionId());
-        LOG.warn(message, e);
-        cause = new TransactionFailureException(message, e);
-      }
-    }
-    if (cause != null) {
-      throw cause;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/TransactionCouldNotTakeSnapshotException.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/TransactionCouldNotTakeSnapshotException.java b/tephra-core/src/main/java/co/cask/tephra/TransactionCouldNotTakeSnapshotException.java
deleted file mode 100644
index a98be14..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/TransactionCouldNotTakeSnapshotException.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra;
-
-/**
- * Throw when taking a snapshot fails.
- */
-public class TransactionCouldNotTakeSnapshotException extends Exception {
-  public TransactionCouldNotTakeSnapshotException(String message) {
-    super(message);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/TransactionExecutor.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/TransactionExecutor.java b/tephra-core/src/main/java/co/cask/tephra/TransactionExecutor.java
deleted file mode 100644
index a1cb188..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/TransactionExecutor.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra;
-
-import com.google.common.util.concurrent.ListenableFuture;
-
-import java.util.concurrent.Callable;
-
-/**
- * Utility that wraps the execution of a function into the context of a transaction.
- */
-// todo: implementations should throw different from TransactionFailureException in case of user code error?
-// todo: accept only Callable? Executors util has a way to convert everything to Callable...
-public interface TransactionExecutor {
-
-  /**
-   * A function is a class with a single method that takes an argument and returns a result.
-   * @param <I> the type of the argument
-   * @param <O> the type of the result
-   */
-  public interface Function<I, O> {
-    O apply(I input) throws Exception;
-  }
-
-  /**
-   * A procedure is a class with a single void method that takes an argument.
-   * @param <I> the type of the argument
-   */
-  public interface Procedure<I> {
-    void apply(I input) throws Exception;
-  }
-
-  /**
-   * A subroutine is a class with a single void method without arguments.
-   */
-  public interface Subroutine {
-    void apply() throws Exception;
-  }
-
-  // Due to a bug in checkstyle, it would emit false positives here of the form
-  // "Unable to get class information for @throws tag '<exn>' (...)".
-  // This comment disables that check up to the corresponding ON comments below
-
-  // CHECKSTYLE OFF: @throws
-
-  /**
-   * Execute a function under transactional semantics. A transaction is started  and all datasets
-   * are initialized with the transaction. Then the passed function is executed, the transaction
-   * is committed, and the function return value is returned as the return value of this method.
-   * If any exception is caught, the transaction is aborted and the original exception is rethrown,
-   * wrapped into a TransactionFailureException. If the transaction fails due to a write conflict,
-   * a TransactionConflictException is thrown.
-   * @param function the function to execute
-   * @param input the input parameter for the function
-   * @param <I> the input type of the function
-   * @param <O> the result type of the function
-   * @return the function's return value
-   * @throws TransactionConflictException if there is a write conflict with another transaction.
-   * @throws TransactionFailureException if any exception is caught, be it from the function or from the datasets.
-   */
-  <I, O> O execute(Function<I, O> function, I input) throws TransactionFailureException, InterruptedException;
-
-  // CHECKSTYLE ON
-
-  /**
-   * Like {@link #execute(Function, Object)} but without a return value.
-   */
-  <I> void execute(Procedure<I> procedure, I input) throws TransactionFailureException, InterruptedException;
-
-  /**
-   * Like {@link #execute(Function, Object)} but the callable has no argument.
-   */
-  <O> O execute(Callable<O> callable) throws TransactionFailureException, InterruptedException;
-
-  /**
-   * Like {@link #execute(Function, Object)} but without argument or return value.
-   */
-  void execute(Subroutine subroutine) throws TransactionFailureException, InterruptedException;
-
-  /**
-   * Same as {@link #execute(Function, Object)} but
-   * suppresses exception with {@link com.google.common.base.Throwables#propagate(Throwable)}
-   */
-  <I, O> O executeUnchecked(Function<I, O> function, I input);
-
-  /**
-   * Same as {@link #execute(Procedure, Object)} but
-   * suppresses exception with {@link com.google.common.base.Throwables#propagate(Throwable)}
-   */
-  <I> void executeUnchecked(Procedure<I> procedure, I input);
-
-  /**
-   * Same as {@link #execute(Callable)} but
-   * suppresses exception with {@link com.google.common.base.Throwables#propagate(Throwable)}
-   */
-  <O> O executeUnchecked(Callable<O> callable);
-
-  /**
-   * Same as {@link #execute(Subroutine)} but
-   * suppresses exception with {@link com.google.common.base.Throwables#propagate(Throwable)}
-   */
-  void executeUnchecked(Subroutine subroutine);
-
-  /**
-   * Same as {@link #execute(Function, Object)} but executes asynchronously
-   */
-  <I, O> ListenableFuture<O> submit(Function<I, O> function, I input);
-
-  /**
-   * Same as {@link #execute(Procedure, Object)} but executes asynchronously
-   */
-  <I> ListenableFuture<?> submit(Procedure<I> procedure, I input);
-
-  /**
-   * Same as {@link #execute(Callable)} but executes asynchronously
-   */
-  <O> ListenableFuture<O> submit(Callable<O> callable);
-
-  /**
-   * Same as {@link #execute(Subroutine)} but executes asynchronously
-   */
-  ListenableFuture<?> submit(Subroutine subroutine);
-
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/TransactionExecutorFactory.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/TransactionExecutorFactory.java b/tephra-core/src/main/java/co/cask/tephra/TransactionExecutorFactory.java
deleted file mode 100644
index 4f30478..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/TransactionExecutorFactory.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra;
-
-/**
- * A factory for transaction executors.
- */
-public interface TransactionExecutorFactory {
-
-  TransactionExecutor createExecutor(Iterable<TransactionAware> txAwares);
-
-}