You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by po...@apache.org on 2016/05/11 20:15:50 UTC
[50/56] [abbrv] [partial] incubator-tephra git commit: Rename package
to org.apache.tephra
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/AbstractTransactionAwareTable.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/AbstractTransactionAwareTable.java b/tephra-core/src/main/java/co/cask/tephra/AbstractTransactionAwareTable.java
deleted file mode 100644
index 11a5b55..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/AbstractTransactionAwareTable.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra;
-
-import com.google.common.base.Charsets;
-import com.google.common.base.Objects;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.primitives.Bytes;
-import com.google.common.primitives.UnsignedBytes;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-
-/**
- * Base class for all the common parts of the HBase version-specific {@code TransactionAwareHTable}
- * implementations.
- */
-public abstract class AbstractTransactionAwareTable implements TransactionAware {
- protected final TransactionCodec txCodec;
- // map of write pointers to change set associated with each
- protected final Map<Long, Set<ActionChange>> changeSets;
- protected final TxConstants.ConflictDetection conflictLevel;
- protected Transaction tx;
- protected boolean allowNonTransactional;
-
- public AbstractTransactionAwareTable(TxConstants.ConflictDetection conflictLevel, boolean allowNonTransactional) {
- this.conflictLevel = conflictLevel;
- this.allowNonTransactional = allowNonTransactional;
- this.txCodec = new TransactionCodec();
- this.changeSets = Maps.newHashMap();
- }
-
- /**
- * True if the instance allows non-transaction operations.
- * @return
- */
- public boolean getAllowNonTransactional() {
- return this.allowNonTransactional;
- }
-
- /**
- * Set whether the instance allows non-transactional operations.
- * @param allowNonTransactional
- */
- public void setAllowNonTransactional(boolean allowNonTransactional) {
- this.allowNonTransactional = allowNonTransactional;
- }
-
- @Override
- public void startTx(Transaction tx) {
- this.tx = tx;
- }
-
- @Override
- public void updateTx(Transaction tx) {
- this.tx = tx;
- }
-
- @Override
- public Collection<byte[]> getTxChanges() {
- if (conflictLevel == TxConstants.ConflictDetection.NONE) {
- return Collections.emptyList();
- }
-
- Collection<byte[]> txChanges = new TreeSet<byte[]>(UnsignedBytes.lexicographicalComparator());
- for (Set<ActionChange> changeSet : changeSets.values()) {
- for (ActionChange change : changeSet) {
- txChanges.add(getChangeKey(change.getRow(), change.getFamily(), change.getQualifier()));
- }
- }
- return txChanges;
- }
-
- public byte[] getChangeKey(byte[] row, byte[] family, byte[] qualifier) {
- byte[] key;
- switch (conflictLevel) {
- case ROW:
- key = Bytes.concat(getTableKey(), row);
- break;
- case COLUMN:
- key = Bytes.concat(getTableKey(), row, family, qualifier);
- break;
- case NONE:
- throw new IllegalStateException("NONE conflict detection does not support change keys");
- default:
- throw new IllegalStateException("Unknown conflict detection level: " + conflictLevel);
- }
- return key;
- }
-
- @Override
- public boolean commitTx() throws Exception {
- return doCommit();
- }
-
- /**
- * Commits any pending writes by flushing the wrapped {@code HTable} instance.
- */
- protected abstract boolean doCommit() throws IOException;
-
- @Override
- public void postTxCommit() {
- tx = null;
- changeSets.clear();
- }
-
- @Override
- public String getTransactionAwareName() {
- return new String(getTableKey(), Charsets.UTF_8);
- }
-
- /**
- * Returns the table name to use as a key prefix for the transaction change set.
- */
- protected abstract byte[] getTableKey();
-
- @Override
- public boolean rollbackTx() throws Exception {
- return doRollback();
- }
-
- /**
- * Rolls back any persisted changes from the transaction by issuing offsetting deletes to the
- * wrapped {@code HTable} instance. How this is handled will depend on the delete API exposed
- * by the specific version of HBase.
- */
- protected abstract boolean doRollback() throws Exception;
-
- protected void addToChangeSet(byte[] row, byte[] family, byte[] qualifier) {
- long currentWritePointer = tx.getWritePointer();
- Set<ActionChange> changeSet = changeSets.get(currentWritePointer);
- if (changeSet == null) {
- changeSet = Sets.newHashSet();
- changeSets.put(currentWritePointer, changeSet);
- }
- switch (conflictLevel) {
- case ROW:
- case NONE:
- // with ROW or NONE conflict detection, we still need to track changes per-family, since this
- // is the granularity at which we will issue deletes for rollback
- changeSet.add(new ActionChange(row, family));
- break;
- case COLUMN:
- changeSet.add(new ActionChange(row, family, qualifier));
- break;
- default:
- throw new IllegalStateException("Unknown conflict detection level: " + conflictLevel);
- }
- }
-
- /**
- * Record of each transaction that causes a change. This reference is used to rollback
- * any operation upon failure.
- */
- protected class ActionChange {
- private final byte[] row;
- private final byte[] family;
- private final byte[] qualifier;
-
- public ActionChange(byte[] row, byte[] family) {
- this(row, family, null);
- }
-
- public ActionChange(byte[] row, byte[] family, byte[] qualifier) {
- this.row = row;
- this.family = family;
- this.qualifier = qualifier;
- }
-
- public byte[] getRow() {
- return row;
- }
-
- public byte[] getFamily() {
- return family;
- }
-
- public byte[] getQualifier() {
- return qualifier;
- }
-
- @Override
- public boolean equals(Object o) {
- if (o == null || o.getClass() != this.getClass()) {
- return false;
- }
-
- if (o == this) {
- return true;
- }
-
- ActionChange other = (ActionChange) o;
- return Objects.equal(this.row, other.row) &&
- Objects.equal(this.family, other.family) &&
- Objects.equal(this.qualifier, other.qualifier);
- }
-
- @Override
- public int hashCode() {
- int result = Arrays.hashCode(row);
- result = 31 * result + (family != null ? Arrays.hashCode(family) : 0);
- result = 31 * result + (qualifier != null ? Arrays.hashCode(qualifier) : 0);
- return result;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/AbstractTransactionExecutor.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/AbstractTransactionExecutor.java b/tephra-core/src/main/java/co/cask/tephra/AbstractTransactionExecutor.java
deleted file mode 100644
index f2a28a8..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/AbstractTransactionExecutor.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra;
-
-import com.google.common.base.Throwables;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-
-/**
- * Provides implementation of asynchronous methods of {@link TransactionExecutor} by delegating their execution
- * to respective synchronous methods via provided {@link ExecutorService}.
- */
-public abstract class AbstractTransactionExecutor implements TransactionExecutor {
- private final ListeningExecutorService executorService;
-
- protected AbstractTransactionExecutor(ExecutorService executorService) {
- this.executorService = MoreExecutors.listeningDecorator(executorService);
- }
-
- @Override
- public <I, O> O executeUnchecked(Function<I, O> function, I input) {
- try {
- return execute(function, input);
- } catch (TransactionFailureException e) {
- throw Throwables.propagate(e);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw Throwables.propagate(e);
- }
- }
-
- @Override
- public <I> void executeUnchecked(Procedure<I> procedure, I input) {
- try {
- execute(procedure, input);
- } catch (TransactionFailureException e) {
- throw Throwables.propagate(e);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw Throwables.propagate(e);
- }
- }
-
- @Override
- public <O> O executeUnchecked(Callable<O> callable) {
- try {
- return execute(callable);
- } catch (TransactionFailureException e) {
- throw Throwables.propagate(e);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw Throwables.propagate(e);
- }
- }
-
- @Override
- public void executeUnchecked(Subroutine subroutine) {
- try {
- execute(subroutine);
- } catch (TransactionFailureException e) {
- throw Throwables.propagate(e);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw Throwables.propagate(e);
- }
- }
-
- @Override
- public <I, O> ListenableFuture<O> submit(final Function<I, O> function, final I input) {
- return executorService.submit(new Callable<O>() {
- @Override
- public O call() throws Exception {
- return execute(function, input);
- }
- });
- }
-
- @Override
- public <I> ListenableFuture<?> submit(final Procedure<I> procedure, final I input) {
- return executorService.submit(new Callable<Object>() {
- @Override
- public I call() throws Exception {
- execute(procedure, input);
- return null;
- }
- });
- }
-
- @Override
- public <O> ListenableFuture<O> submit(final Callable<O> callable) {
- return executorService.submit(new Callable<O>() {
- @Override
- public O call() throws Exception {
- return execute(callable);
- }
- });
- }
-
- @Override
- public ListenableFuture<?> submit(final Subroutine subroutine) {
- return executorService.submit(new Callable<Object>() {
- @Override
- public Object call() throws Exception {
- execute(subroutine);
- return null;
- }
- });
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/ChangeId.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/ChangeId.java b/tephra-core/src/main/java/co/cask/tephra/ChangeId.java
deleted file mode 100644
index 8413256..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/ChangeId.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra;
-
-import java.util.Arrays;
-
-/**
- * Represents a row key from a data set changed as part of a transaction.
- */
-public final class ChangeId {
- private final byte[] key;
- private final int hash;
-
- public ChangeId(byte[] bytes) {
- key = bytes;
- hash = Arrays.hashCode(bytes);
- }
-
- public byte[] getKey() {
- return key;
- }
-
- @Override
- public boolean equals(Object o) {
- if (o == this) {
- return true;
- }
- if (o == null || o.getClass() != ChangeId.class) {
- return false;
- }
- ChangeId other = (ChangeId) o;
- return hash == other.hash && Arrays.equals(key, other.key);
- }
-
- @Override
- public int hashCode() {
- return hash;
- }
-
- @Override
- public String toString() {
- return toStringBinary(key, 0, key.length);
- }
-
- // Copy from Bytes.toStringBinary so that we don't need direct dependencies on Bytes.
- private String toStringBinary(byte [] b, int off, int len) {
- StringBuilder result = new StringBuilder();
- for (int i = off; i < off + len; ++i) {
- int ch = b[i] & 0xFF;
- if ((ch >= '0' && ch <= '9')
- || (ch >= 'A' && ch <= 'Z')
- || (ch >= 'a' && ch <= 'z')
- || " `~!@#$%^&*()-_=+[]{}|;:'\",.<>/?".indexOf(ch) >= 0) {
- result.append((char) ch);
- } else {
- result.append(String.format("\\x%02X", ch));
- }
- }
- return result.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/DefaultTransactionExecutor.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/DefaultTransactionExecutor.java b/tephra-core/src/main/java/co/cask/tephra/DefaultTransactionExecutor.java
deleted file mode 100644
index f85dd6b..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/DefaultTransactionExecutor.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Utility class that encapsulates the transaction life cycle over a given set of
- * transaction-aware datasets. The executor can be reused across multiple invocations
- * of the execute() method. However, it is not thread-safe for concurrent execution.
- * <p>
- * Transaction execution will be retries according to specified in constructor {@link RetryStrategy}.
- * By default {@link RetryOnConflictStrategy} is used with max 20 retries and 100 ms between retries.
- * </p>
- */
-public class DefaultTransactionExecutor extends AbstractTransactionExecutor {
-
- private final Collection<TransactionAware> txAwares;
- private final TransactionSystemClient txClient;
- private final RetryStrategy retryStrategy;
-
- /**
- * Convenience constructor, has same affect as {@link #DefaultTransactionExecutor(TransactionSystemClient, Iterable)}
- */
- public DefaultTransactionExecutor(TransactionSystemClient txClient, TransactionAware... txAwares) {
- this(txClient, Arrays.asList(txAwares));
- }
-
-
- public DefaultTransactionExecutor(TransactionSystemClient txClient,
- Iterable<TransactionAware> txAwares,
- RetryStrategy retryStrategy) {
-
- super(MoreExecutors.sameThreadExecutor());
- this.txAwares = ImmutableList.copyOf(txAwares);
- this.txClient = txClient;
- this.retryStrategy = retryStrategy;
- }
-
- /**
- * Constructor for a transaction executor.
- */
- @Inject
- public DefaultTransactionExecutor(TransactionSystemClient txClient, @Assisted Iterable<TransactionAware> txAwares) {
- this(txClient, txAwares, RetryStrategies.retryOnConflict(20, 100));
- }
-
- @Override
- public <I, O> O execute(Function<I, O> function, I input) throws TransactionFailureException, InterruptedException {
- return executeWithRetry(function, input);
- }
-
- @Override
- public <I> void execute(final Procedure<I> procedure, I input)
- throws TransactionFailureException, InterruptedException {
-
- execute(new Function<I, Void>() {
- @Override
- public Void apply(I input) throws Exception {
- procedure.apply(input);
- return null;
- }
- }, input);
- }
-
- @Override
- public <O> O execute(final Callable<O> callable) throws TransactionFailureException, InterruptedException {
- return execute(new Function<Void, O>() {
- @Override
- public O apply(Void input) throws Exception {
- return callable.call();
- }
- }, null);
- }
-
- @Override
- public void execute(final Subroutine subroutine) throws TransactionFailureException, InterruptedException {
- execute(new Function<Void, Void>() {
- @Override
- public Void apply(Void input) throws Exception {
- subroutine.apply();
- return null;
- }
- }, null);
- }
-
- private <I, O> O executeWithRetry(Function<I, O> function, I input)
- throws TransactionFailureException, InterruptedException {
-
- int retries = 0;
- while (true) {
- try {
- return executeOnce(function, input);
- } catch (TransactionFailureException e) {
- long delay = retryStrategy.nextRetry(e, ++retries);
-
- if (delay < 0) {
- throw e;
- }
-
- if (delay > 0) {
- TimeUnit.MILLISECONDS.sleep(delay);
- }
- }
- }
-
- }
-
- private <I, O> O executeOnce(Function<I, O> function, I input) throws TransactionFailureException {
- TransactionContext txContext = new TransactionContext(txClient, txAwares);
- txContext.start();
- O o = null;
- try {
- o = function.apply(input);
- } catch (Throwable e) {
- txContext.abort(new TransactionFailureException("Transaction function failure for transaction. ", e));
- // abort will throw
- }
- // will throw if smth goes wrong
- txContext.finish();
- return o;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/InvalidTruncateTimeException.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/InvalidTruncateTimeException.java b/tephra-core/src/main/java/co/cask/tephra/InvalidTruncateTimeException.java
deleted file mode 100644
index b4576ee..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/InvalidTruncateTimeException.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra;
-
-/**
- * Thrown when truncate invalid list is called with a time, and when there are in-progress transactions that
- * were started before the given time.
- */
-public class InvalidTruncateTimeException extends Exception {
- public InvalidTruncateTimeException(String s) {
- super(s);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/NoRetryStrategy.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/NoRetryStrategy.java b/tephra-core/src/main/java/co/cask/tephra/NoRetryStrategy.java
deleted file mode 100644
index 30e9ceb..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/NoRetryStrategy.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra;
-
-/**
- * Does no retries
- */
-public class NoRetryStrategy implements RetryStrategy {
- public static final RetryStrategy INSTANCE = new NoRetryStrategy();
-
- private NoRetryStrategy() {}
-
- @Override
- public long nextRetry(TransactionFailureException reason, int failureCount) {
- return -1;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/RetryOnConflictStrategy.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/RetryOnConflictStrategy.java b/tephra-core/src/main/java/co/cask/tephra/RetryOnConflictStrategy.java
deleted file mode 100644
index 04b9f75..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/RetryOnConflictStrategy.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra;
-
-/**
- * Retries transaction execution when transaction fails with {@link TransactionConflictException}.
- */
-public class RetryOnConflictStrategy implements RetryStrategy {
- private final int maxRetries;
- private final long retryDelay;
-
- public RetryOnConflictStrategy(int maxRetries, long retryDelay) {
- this.maxRetries = maxRetries;
- this.retryDelay = retryDelay;
- }
-
- @Override
- public long nextRetry(TransactionFailureException reason, int failureCount) {
- if (reason instanceof TransactionConflictException) {
- return failureCount > maxRetries ? -1 : retryDelay;
- } else {
- return -1;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/RetryStrategies.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/RetryStrategies.java b/tephra-core/src/main/java/co/cask/tephra/RetryStrategies.java
deleted file mode 100644
index d1de0e5..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/RetryStrategies.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra;
-
-/**
- * Collection of {@link RetryStrategy}s.
- */
-public final class RetryStrategies {
- private RetryStrategies() {}
-
- /**
- * @param maxRetries max number of retries
- * @param delayInMs delay between retries in milliseconds
- * @return RetryStrategy that retries transaction execution when transaction fails with
- * {@link TransactionConflictException}
- */
- public static RetryStrategy retryOnConflict(int maxRetries, long delayInMs) {
- return new RetryOnConflictStrategy(maxRetries, delayInMs);
- }
-
- public static RetryStrategy noRetries() {
- return NoRetryStrategy.INSTANCE;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/RetryStrategy.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/RetryStrategy.java b/tephra-core/src/main/java/co/cask/tephra/RetryStrategy.java
deleted file mode 100644
index e651d0e..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/RetryStrategy.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra;
-
-/**
- * Retry strategy for failed transactions
- */
-public interface RetryStrategy {
- /**
- * Returns the number of milliseconds to wait before retrying the operation.
- *
- * @param reason Reason for transaction failure.
- * @param failureCount Number of times that the request has been failed.
- * @return Number of milliseconds to wait before retrying the operation. Returning {@code 0} means
- * retry it immediately, while negative means abort the operation.
- */
- long nextRetry(TransactionFailureException reason, int failureCount);
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/TransactionAdmin.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/TransactionAdmin.java b/tephra-core/src/main/java/co/cask/tephra/TransactionAdmin.java
deleted file mode 100644
index 25ccbfb..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/TransactionAdmin.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra;
-
-import co.cask.tephra.runtime.ConfigModule;
-import co.cask.tephra.runtime.DiscoveryModules;
-import co.cask.tephra.runtime.TransactionClientModule;
-import co.cask.tephra.runtime.TransactionModules;
-import co.cask.tephra.runtime.ZKModule;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.twill.zookeeper.ZKClientService;
-
-import java.io.PrintStream;
-import java.util.List;
-import java.util.Set;
-
-/**
- * Allows calling some methods on {@link TransactionManager} from command line.
- */
-public class TransactionAdmin {
- private static final String OPT_TRUNCATE_INVALID_TX = "--truncate-invalid-tx";
- private static final String OPT_TRUNCATE_INVALID_TX_BEFORE = "--truncate-invalid-tx-before";
- private static final String OPT_GET_INVALID_TX_SIZE = "--get-invalid-tx-size";
-
- private final PrintStream out;
- private final PrintStream err;
-
- public static void main(String[] args) {
- TransactionAdmin txAdmin = new TransactionAdmin(System.out, System.err);
- int status = txAdmin.doMain(args, new Configuration());
- System.exit(status);
- }
-
- public TransactionAdmin(PrintStream out, PrintStream err) {
- this.out = out;
- this.err = err;
- }
-
- @VisibleForTesting
- int doMain(String[] args, Configuration conf) {
- if (args.length < 1) {
- printUsage();
- return 1;
- }
-
- Injector injector = Guice.createInjector(
- new ConfigModule(conf),
- new ZKModule(),
- new DiscoveryModules().getDistributedModules(),
- new TransactionModules().getDistributedModules(),
- new TransactionClientModule()
- );
-
- ZKClientService zkClient = injector.getInstance(ZKClientService.class);
- zkClient.startAndWait();
-
- try {
- TransactionSystemClient txClient = injector.getInstance(TransactionSystemClient.class);
- String option = args[0];
-
- if (option.equals(OPT_TRUNCATE_INVALID_TX)) {
- if (args.length != 2) {
- printUsage();
- return 1;
- }
- Set<Long> txIds;
- try {
- txIds = parseTxIds(args[1]);
- } catch (NumberFormatException e) {
- err.println("NumberFormatException: " + e.getMessage());
- return 1;
- }
- if (!txIds.isEmpty()) {
- out.println("Invalid list size before truncation: " + txClient.getInvalidSize());
- txClient.truncateInvalidTx(txIds);
- out.println("Invalid list size after truncation: " + txClient.getInvalidSize());
- }
- } else if (option.equals(OPT_TRUNCATE_INVALID_TX_BEFORE)) {
- if (args.length != 2) {
- printUsage();
- return 1;
- }
- try {
- long time = Long.parseLong(args[1]);
- out.println("Invalid list size before truncation: " + txClient.getInvalidSize());
- txClient.truncateInvalidTxBefore(time);
- out.println("Invalid list size after truncation: " + txClient.getInvalidSize());
- } catch (InvalidTruncateTimeException e) {
- err.println(e.getMessage());
- return 1;
- } catch (NumberFormatException e) {
- err.println("NumberFormatException: " + e.getMessage());
- return 1;
- }
- } else if (option.equals(OPT_GET_INVALID_TX_SIZE)) {
- if (args.length != 1) {
- printUsage();
- return 1;
- }
- out.println("Invalid list size: " + txClient.getInvalidSize());
- } else {
- printUsage();
- return 1;
- }
- } finally {
- zkClient.stopAndWait();
- }
- return 0;
- }
-
- private Set<Long> parseTxIds(String option) throws NumberFormatException {
- Set<Long> txIds = Sets.newHashSet();
- for (String str : Splitter.on(',').split(option)) {
- txIds.add(Long.parseLong(str));
- }
- return txIds;
- }
-
- private void printUsage() {
- String programName = TransactionAdmin.class.getSimpleName();
- String spaces = " ";
- List<String> options = Lists.newArrayList();
- options.add(join("Usage: "));
- options.add(join(spaces, programName, OPT_TRUNCATE_INVALID_TX, "<tx1,tx2,...>"));
- options.add(join(spaces, programName, OPT_TRUNCATE_INVALID_TX_BEFORE, "<time in secs>"));
- options.add(join(spaces, programName, OPT_GET_INVALID_TX_SIZE));
-
- String usage = Joiner.on(System.getProperty("line.separator")).join(options);
- err.println(usage);
- }
-
- private static String join(String... args) {
- return Joiner.on(" ").join(args);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/TransactionCodec.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/TransactionCodec.java b/tephra-core/src/main/java/co/cask/tephra/TransactionCodec.java
deleted file mode 100644
index 403b1ec..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/TransactionCodec.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra;
-
-import co.cask.tephra.distributed.TransactionConverterUtils;
-import co.cask.tephra.distributed.thrift.TTransaction;
-import org.apache.thrift.TDeserializer;
-import org.apache.thrift.TException;
-import org.apache.thrift.TSerializer;
-
-import java.io.IOException;
-
-/**
- * Handles serialization and deserialization of {@link co.cask.tephra.Transaction} instances to and from {@code byte[]}.
- */
-public class TransactionCodec {
-
- public TransactionCodec() {
- }
-
- public byte[] encode(Transaction tx) throws IOException {
- TTransaction thriftTx = TransactionConverterUtils.wrap(tx);
- TSerializer serializer = new TSerializer();
- try {
- return serializer.serialize(thriftTx);
- } catch (TException te) {
- throw new IOException(te);
- }
- }
-
- public Transaction decode(byte[] encoded) throws IOException {
- TTransaction thriftTx = new TTransaction();
- TDeserializer deserializer = new TDeserializer();
- try {
- deserializer.deserialize(thriftTx, encoded);
- return TransactionConverterUtils.unwrap(thriftTx);
- } catch (TException te) {
- throw new IOException(te);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/TransactionContext.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/TransactionContext.java b/tephra-core/src/main/java/co/cask/tephra/TransactionContext.java
deleted file mode 100644
index 5b31ebe..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/TransactionContext.java
+++ /dev/null
@@ -1,323 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collection;
-import javax.annotation.Nullable;
-
-/**
- * Utility class that encapsulates the transaction life cycle over a given set of
- * transaction-aware datasets. It is not thread-safe for concurrent execution.
- */
-public class TransactionContext {
-
- private static final Logger LOG = LoggerFactory.getLogger(TransactionContext.class);
-
- private final Collection<TransactionAware> txAwares;
- private final TransactionSystemClient txClient;
-
- private Transaction currentTx;
-
- public TransactionContext(TransactionSystemClient txClient, TransactionAware... txAwares) {
- this(txClient, ImmutableList.copyOf(txAwares));
- }
-
- public TransactionContext(TransactionSystemClient txClient, Iterable<TransactionAware> txAwares) {
- // Use a set to avoid adding the same TransactionAware twice and to make removal faster.
- // Use a linked hash set so that insertion order is preserved (same behavior as when it was using a List).
- this.txAwares = Sets.newLinkedHashSet(txAwares);
- this.txClient = txClient;
- }
-
- /**
- * Adds a new transaction-aware to participate in the transaction.
- * @param txAware the new transaction-aware
- */
- public boolean addTransactionAware(TransactionAware txAware) {
- // If the txAware is newly added, call startTx as well if there is an active transaction
- boolean added = txAwares.add(txAware);
- if (added && currentTx != null) {
- txAware.startTx(currentTx);
- }
- return added;
- }
-
- /**
- * Removes a {@link TransactionAware} and withdraws from participation in the transaction.
- * Withdrawal is only allowed if there is no active transaction.
- *
- * @param txAware the {@link TransactionAware} to be removed
- * @return true if the given {@link TransactionAware} is removed; false otherwise.
- * @throws IllegalStateException if there is an active transaction going on with this TransactionContext.
- */
- public boolean removeTransactionAware(TransactionAware txAware) {
- Preconditions.checkState(currentTx == null, "Cannot remove TransactionAware while there is an active transaction.");
- return txAwares.remove(txAware);
- }
-
- /**
- * Starts a new transaction. Calling this will initiate a new transaction using the {@link TransactionSystemClient},
- * and pass the returned transaction to {@link TransactionAware#startTx(Transaction)} for each registered
- * TransactionAware. If an exception is encountered, the transaction will be aborted and a
- * {@code TransactionFailureException} wrapping the root cause will be thrown.
- *
- * @throws TransactionFailureException if an exception occurs starting the transaction with any registered
- * TransactionAware
- */
- public void start() throws TransactionFailureException {
- currentTx = txClient.startShort();
- for (TransactionAware txAware : txAwares) {
- try {
- txAware.startTx(currentTx);
- } catch (Throwable e) {
- String message = String.format("Unable to start transaction-aware '%s' for transaction %d. ",
- txAware.getTransactionAwareName(), currentTx.getTransactionId());
- LOG.warn(message, e);
- txClient.abort(currentTx);
- throw new TransactionFailureException(message, e);
- }
- }
- }
-
- /**
- * Commits the current transaction. This will: check for any conflicts, based on the change set aggregated from
- * all registered {@link TransactionAware} instances; flush any pending writes from the {@code TransactionAware}s;
- * commit the current transaction with the {@link TransactionSystemClient}; and clear the current transaction state.
- *
- * @throws TransactionConflictException if a conflict is detected with a recently committed transaction
- * @throws TransactionFailureException if an error occurs while committing
- */
- public void finish() throws TransactionFailureException {
- Preconditions.checkState(currentTx != null, "Cannot finish tx that has not been started");
- // each of these steps will abort and rollback the tx in case if errors, and throw an exception
- checkForConflicts();
- persist();
- commit();
- postCommit();
- currentTx = null;
- }
-
- /**
- * Aborts the given transaction, and rolls back all data set changes. If rollback fails,
- * the transaction is invalidated. If an exception is caught during rollback, the exception
- * is rethrown wrapped in a TransactionFailureException, after all remaining TransactionAwares have
- * completed rollback.
- *
- * @throws TransactionFailureException for any exception that is encountered.
- */
- public void abort() throws TransactionFailureException {
- abort(null);
- }
-
- /**
- * Checkpoints the current transaction by flushing any pending writes for the registered {@link TransactionAware}
- * instances, and obtaining a new current write pointer for the transaction. By performing a checkpoint,
- * the client can ensure that all previous writes were flushed and are visible. By default, the current write
- * pointer for the transaction is also visible. The current write pointer can be excluded from read
- * operations by calling {@link Transaction#setVisibility(Transaction.VisibilityLevel)} with the visibility level set
- * to {@link Transaction.VisibilityLevel#SNAPSHOT_EXCLUDE_CURRENT} on the {@link Transaction} instance created
- * by the checkpoint call, which can be retrieved by calling {@link #getCurrentTransaction()}.
- *
- * After the checkpoint operation is performed, the updated
- * {@link Transaction} instance will be passed to {@link TransactionAware#startTx(Transaction)} for each
- * registered {@code TransactionAware} instance.
- *
- * @throws TransactionFailureException if an error occurs while performing the checkpoint
- */
- public void checkpoint() throws TransactionFailureException {
- Preconditions.checkState(currentTx != null, "Cannot checkpoint tx that has not been started");
- persist();
- try {
- currentTx = txClient.checkpoint(currentTx);
- // update the current transaction with all TransactionAwares
- for (TransactionAware txAware : txAwares) {
- txAware.updateTx(currentTx);
- }
- } catch (TransactionNotInProgressException e) {
- String message = String.format("Transaction %d is not in progress.", currentTx.getTransactionId());
- LOG.warn(message, e);
- abort(new TransactionFailureException(message, e));
- // abort will throw that exception
- } catch (Throwable e) {
- String message = String.format("Exception from checkpoint for transaction %d.", currentTx.getTransactionId());
- LOG.warn(message, e);
- abort(new TransactionFailureException(message, e));
- // abort will throw that exception
- }
- }
-
- /**
- * Returns the current transaction or null if no transaction is currently in progress.
- */
- @Nullable
- public Transaction getCurrentTransaction() {
- return currentTx;
- }
-
- // CHECKSTYLE IGNORE "@throws" FOR 11 LINES
- /**
- * Aborts the given transaction, and rolls back all data set changes. If rollback fails,
- * the transaction is invalidated. If an exception is caught during rollback, the exception
- * is rethrown wrapped into a TransactionFailureException, after all remaining TransactionAwares have
- * completed rollback. If an existing exception is passed in, that exception is thrown in either
- * case, whether the rollback is successful or not. In other words, this method always throws the
- * first exception that it encounters.
- * @param cause the original exception that caused the abort
- * @throws TransactionFailureException for any exception that is encountered.
- */
- public void abort(TransactionFailureException cause) throws TransactionFailureException {
- if (currentTx == null) {
- // might be called by some generic exception handler even though already aborted/finished - we allow that
- return;
- }
- try {
- boolean success = true;
- for (TransactionAware txAware : txAwares) {
- try {
- if (!txAware.rollbackTx()) {
- success = false;
- }
- } catch (Throwable e) {
- String message = String.format("Unable to roll back changes in transaction-aware '%s' for transaction %d. ",
- txAware.getTransactionAwareName(), currentTx.getTransactionId());
- LOG.warn(message, e);
- if (cause == null) {
- cause = new TransactionFailureException(message, e);
- }
- success = false;
- }
- }
- if (success) {
- txClient.abort(currentTx);
- } else {
- txClient.invalidate(currentTx.getTransactionId());
- }
- if (cause != null) {
- throw cause;
- }
- } finally {
- currentTx = null;
- }
- }
-
- private void checkForConflicts() throws TransactionFailureException {
- Collection<byte[]> changes = Lists.newArrayList();
- for (TransactionAware txAware : txAwares) {
- try {
- changes.addAll(txAware.getTxChanges());
- } catch (Throwable e) {
- String message = String.format("Unable to retrieve changes from transaction-aware '%s' for transaction %d. ",
- txAware.getTransactionAwareName(), currentTx.getTransactionId());
- LOG.warn(message, e);
- abort(new TransactionFailureException(message, e));
- // abort will throw that exception
- }
- }
-
- boolean canCommit = false;
- try {
- canCommit = txClient.canCommit(currentTx, changes);
- } catch (TransactionNotInProgressException e) {
- String message = String.format("Transaction %d is not in progress.", currentTx.getTransactionId());
- LOG.warn(message, e);
- abort(new TransactionFailureException(message, e));
- // abort will throw that exception
- } catch (Throwable e) {
- String message = String.format("Exception from canCommit for transaction %d.", currentTx.getTransactionId());
- LOG.warn(message, e);
- abort(new TransactionFailureException(message, e));
- // abort will throw that exception
- }
- if (!canCommit) {
- String message = String.format("Conflict detected for transaction %d.", currentTx.getTransactionId());
- abort(new TransactionConflictException(message));
- // abort will throw
- }
- }
-
- private void persist() throws TransactionFailureException {
- for (TransactionAware txAware : txAwares) {
- boolean success;
- Throwable cause = null;
- try {
- success = txAware.commitTx();
- } catch (Throwable e) {
- success = false;
- cause = e;
- }
- if (!success) {
- String message = String.format("Unable to persist changes of transaction-aware '%s' for transaction %d. ",
- txAware.getTransactionAwareName(), currentTx.getTransactionId());
- if (cause == null) {
- LOG.warn(message);
- } else {
- LOG.warn(message, cause);
- }
- abort(new TransactionFailureException(message, cause));
- // abort will throw that exception
- }
- }
- }
-
- private void commit() throws TransactionFailureException {
- boolean commitSuccess = false;
- try {
- commitSuccess = txClient.commit(currentTx);
- } catch (TransactionNotInProgressException e) {
- String message = String.format("Transaction %d is not in progress.", currentTx.getTransactionId());
- LOG.warn(message, e);
- abort(new TransactionFailureException(message, e));
- // abort will throw that exception
- } catch (Throwable e) {
- String message = String.format("Exception from commit for transaction %d.", currentTx.getTransactionId());
- LOG.warn(message, e);
- abort(new TransactionFailureException(message, e));
- // abort will throw that exception
- }
- if (!commitSuccess) {
- String message = String.format("Conflict detected for transaction %d.", currentTx.getTransactionId());
- abort(new TransactionConflictException(message));
- // abort will throw
- }
- }
-
- private void postCommit() throws TransactionFailureException {
- TransactionFailureException cause = null;
- for (TransactionAware txAware : txAwares) {
- try {
- txAware.postTxCommit();
- } catch (Throwable e) {
- String message = String.format("Unable to perform post-commit in transaction-aware '%s' for transaction %d. ",
- txAware.getTransactionAwareName(), currentTx.getTransactionId());
- LOG.warn(message, e);
- cause = new TransactionFailureException(message, e);
- }
- }
- if (cause != null) {
- throw cause;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/TransactionCouldNotTakeSnapshotException.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/TransactionCouldNotTakeSnapshotException.java b/tephra-core/src/main/java/co/cask/tephra/TransactionCouldNotTakeSnapshotException.java
deleted file mode 100644
index a98be14..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/TransactionCouldNotTakeSnapshotException.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra;
-
-/**
- * Throw when taking a snapshot fails.
- */
-public class TransactionCouldNotTakeSnapshotException extends Exception {
- public TransactionCouldNotTakeSnapshotException(String message) {
- super(message);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/TransactionExecutor.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/TransactionExecutor.java b/tephra-core/src/main/java/co/cask/tephra/TransactionExecutor.java
deleted file mode 100644
index a1cb188..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/TransactionExecutor.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra;
-
-import com.google.common.util.concurrent.ListenableFuture;
-
-import java.util.concurrent.Callable;
-
-/**
- * Utility that wraps the execution of a function into the context of a transaction.
- */
-// todo: implementations should throw different from TransactionFailureException in case of user code error?
-// todo: accept only Callable? Executors util has a way to convert everything to Callable...
-public interface TransactionExecutor {
-
- /**
- * A function is a class with a single method that takes an argument and returns a result.
- * @param <I> the type of the argument
- * @param <O> the type of the result
- */
- public interface Function<I, O> {
- O apply(I input) throws Exception;
- }
-
- /**
- * A procedure is a class with a single void method that takes an argument.
- * @param <I> the type of the argument
- */
- public interface Procedure<I> {
- void apply(I input) throws Exception;
- }
-
- /**
- * A subroutine is a class with a single void method without arguments.
- */
- public interface Subroutine {
- void apply() throws Exception;
- }
-
- // Due to a bug in checkstyle, it would emit false positives here of the form
- // "Unable to get class information for @throws tag '<exn>' (...)".
- // This comment disables that check up to the corresponding ON comments below
-
- // CHECKSTYLE OFF: @throws
-
- /**
- * Execute a function under transactional semantics. A transaction is started and all datasets
- * are initialized with the transaction. Then the passed function is executed, the transaction
- * is committed, and the function return value is returned as the return value of this method.
- * If any exception is caught, the transaction is aborted and the original exception is rethrown,
- * wrapped into a TransactionFailureException. If the transaction fails due to a write conflict,
- * a TransactionConflictException is thrown.
- * @param function the function to execute
- * @param input the input parameter for the function
- * @param <I> the input type of the function
- * @param <O> the result type of the function
- * @return the function's return value
- * @throws TransactionConflictException if there is a write conflict with another transaction.
- * @throws TransactionFailureException if any exception is caught, be it from the function or from the datasets.
- */
- <I, O> O execute(Function<I, O> function, I input) throws TransactionFailureException, InterruptedException;
-
- // CHECKSTYLE ON
-
- /**
- * Like {@link #execute(Function, Object)} but without a return value.
- */
- <I> void execute(Procedure<I> procedure, I input) throws TransactionFailureException, InterruptedException;
-
- /**
- * Like {@link #execute(Function, Object)} but the callable has no argument.
- */
- <O> O execute(Callable<O> callable) throws TransactionFailureException, InterruptedException;
-
- /**
- * Like {@link #execute(Function, Object)} but without argument or return value.
- */
- void execute(Subroutine subroutine) throws TransactionFailureException, InterruptedException;
-
- /**
- * Same as {@link #execute(Function, Object)} but
- * suppresses exception with {@link com.google.common.base.Throwables#propagate(Throwable)}
- */
- <I, O> O executeUnchecked(Function<I, O> function, I input);
-
- /**
- * Same as {@link #execute(Procedure, Object)} but
- * suppresses exception with {@link com.google.common.base.Throwables#propagate(Throwable)}
- */
- <I> void executeUnchecked(Procedure<I> procedure, I input);
-
- /**
- * Same as {@link #execute(Callable)} but
- * suppresses exception with {@link com.google.common.base.Throwables#propagate(Throwable)}
- */
- <O> O executeUnchecked(Callable<O> callable);
-
- /**
- * Same as {@link #execute(Subroutine)} but
- * suppresses exception with {@link com.google.common.base.Throwables#propagate(Throwable)}
- */
- void executeUnchecked(Subroutine subroutine);
-
- /**
- * Same as {@link #execute(Function, Object)} but executes asynchronously
- */
- <I, O> ListenableFuture<O> submit(Function<I, O> function, I input);
-
- /**
- * Same as {@link #execute(Procedure, Object)} but executes asynchronously
- */
- <I> ListenableFuture<?> submit(Procedure<I> procedure, I input);
-
- /**
- * Same as {@link #execute(Callable)} but executes asynchronously
- */
- <O> ListenableFuture<O> submit(Callable<O> callable);
-
- /**
- * Same as {@link #execute(Subroutine)} but executes asynchronously
- */
- ListenableFuture<?> submit(Subroutine subroutine);
-
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/TransactionExecutorFactory.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/TransactionExecutorFactory.java b/tephra-core/src/main/java/co/cask/tephra/TransactionExecutorFactory.java
deleted file mode 100644
index 4f30478..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/TransactionExecutorFactory.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra;
-
-/**
- * A factory for transaction executors.
- */
-public interface TransactionExecutorFactory {
-
- TransactionExecutor createExecutor(Iterable<TransactionAware> txAwares);
-
-}