You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by po...@apache.org on 2016/05/06 23:02:46 UTC
[40/51] [partial] incubator-tephra git commit: Rename package to
org.apache.tephra
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/RetryStrategy.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/RetryStrategy.java b/tephra-core/src/main/java/org/apache/tephra/RetryStrategy.java
new file mode 100644
index 0000000..4c33538
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/RetryStrategy.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra;
+
+/**
+ * Retry strategy for failed transactions
+ */
+public interface RetryStrategy {
+ /**
+ * Returns the number of milliseconds to wait before retrying the operation.
+ *
+ * @param reason Reason for transaction failure.
+ * @param failureCount Number of times that the request has been failed.
+ * @return Number of milliseconds to wait before retrying the operation. Returning {@code 0} means
+ * retry it immediately, while negative means abort the operation.
+ */
+ long nextRetry(TransactionFailureException reason, int failureCount);
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/TransactionAdmin.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/TransactionAdmin.java b/tephra-core/src/main/java/org/apache/tephra/TransactionAdmin.java
new file mode 100644
index 0000000..e5902a7
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/TransactionAdmin.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tephra.runtime.ConfigModule;
+import org.apache.tephra.runtime.DiscoveryModules;
+import org.apache.tephra.runtime.TransactionClientModule;
+import org.apache.tephra.runtime.TransactionModules;
+import org.apache.tephra.runtime.ZKModule;
+import org.apache.twill.zookeeper.ZKClientService;
+
+import java.io.PrintStream;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Allows calling some methods on {@link TransactionManager} from command line.
+ */
+public class TransactionAdmin {
+ private static final String OPT_TRUNCATE_INVALID_TX = "--truncate-invalid-tx";
+ private static final String OPT_TRUNCATE_INVALID_TX_BEFORE = "--truncate-invalid-tx-before";
+ private static final String OPT_GET_INVALID_TX_SIZE = "--get-invalid-tx-size";
+
+ private final PrintStream out;
+ private final PrintStream err;
+
+ public static void main(String[] args) {
+ TransactionAdmin txAdmin = new TransactionAdmin(System.out, System.err);
+ int status = txAdmin.doMain(args, new Configuration());
+ System.exit(status);
+ }
+
+ public TransactionAdmin(PrintStream out, PrintStream err) {
+ this.out = out;
+ this.err = err;
+ }
+
+ @VisibleForTesting
+ int doMain(String[] args, Configuration conf) {
+ if (args.length < 1) {
+ printUsage();
+ return 1;
+ }
+
+ Injector injector = Guice.createInjector(
+ new ConfigModule(conf),
+ new ZKModule(),
+ new DiscoveryModules().getDistributedModules(),
+ new TransactionModules().getDistributedModules(),
+ new TransactionClientModule()
+ );
+
+ ZKClientService zkClient = injector.getInstance(ZKClientService.class);
+ zkClient.startAndWait();
+
+ try {
+ TransactionSystemClient txClient = injector.getInstance(TransactionSystemClient.class);
+ String option = args[0];
+
+ if (option.equals(OPT_TRUNCATE_INVALID_TX)) {
+ if (args.length != 2) {
+ printUsage();
+ return 1;
+ }
+ Set<Long> txIds;
+ try {
+ txIds = parseTxIds(args[1]);
+ } catch (NumberFormatException e) {
+ err.println("NumberFormatException: " + e.getMessage());
+ return 1;
+ }
+ if (!txIds.isEmpty()) {
+ out.println("Invalid list size before truncation: " + txClient.getInvalidSize());
+ txClient.truncateInvalidTx(txIds);
+ out.println("Invalid list size after truncation: " + txClient.getInvalidSize());
+ }
+ } else if (option.equals(OPT_TRUNCATE_INVALID_TX_BEFORE)) {
+ if (args.length != 2) {
+ printUsage();
+ return 1;
+ }
+ try {
+ long time = Long.parseLong(args[1]);
+ out.println("Invalid list size before truncation: " + txClient.getInvalidSize());
+ txClient.truncateInvalidTxBefore(time);
+ out.println("Invalid list size after truncation: " + txClient.getInvalidSize());
+ } catch (InvalidTruncateTimeException e) {
+ err.println(e.getMessage());
+ return 1;
+ } catch (NumberFormatException e) {
+ err.println("NumberFormatException: " + e.getMessage());
+ return 1;
+ }
+ } else if (option.equals(OPT_GET_INVALID_TX_SIZE)) {
+ if (args.length != 1) {
+ printUsage();
+ return 1;
+ }
+ out.println("Invalid list size: " + txClient.getInvalidSize());
+ } else {
+ printUsage();
+ return 1;
+ }
+ } finally {
+ zkClient.stopAndWait();
+ }
+ return 0;
+ }
+
+ private Set<Long> parseTxIds(String option) throws NumberFormatException {
+ Set<Long> txIds = Sets.newHashSet();
+ for (String str : Splitter.on(',').split(option)) {
+ txIds.add(Long.parseLong(str));
+ }
+ return txIds;
+ }
+
+ private void printUsage() {
+ String programName = TransactionAdmin.class.getSimpleName();
+ String spaces = " ";
+ List<String> options = Lists.newArrayList();
+ options.add(join("Usage: "));
+ options.add(join(spaces, programName, OPT_TRUNCATE_INVALID_TX, "<tx1,tx2,...>"));
+ options.add(join(spaces, programName, OPT_TRUNCATE_INVALID_TX_BEFORE, "<time in secs>"));
+ options.add(join(spaces, programName, OPT_GET_INVALID_TX_SIZE));
+
+ String usage = Joiner.on(System.getProperty("line.separator")).join(options);
+ err.println(usage);
+ }
+
+ private static String join(String... args) {
+ return Joiner.on(" ").join(args);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/TransactionCodec.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/TransactionCodec.java b/tephra-core/src/main/java/org/apache/tephra/TransactionCodec.java
new file mode 100644
index 0000000..c147917
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/TransactionCodec.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra;
+
+import org.apache.tephra.distributed.TransactionConverterUtils;
+import org.apache.tephra.distributed.thrift.TTransaction;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+
+import java.io.IOException;
+
+/**
+ * Handles serialization and deserialization of {@link Transaction} instances to and from {@code byte[]}.
+ */
+public class TransactionCodec {
+
+ public TransactionCodec() {
+ }
+
+ public byte[] encode(Transaction tx) throws IOException {
+ TTransaction thriftTx = TransactionConverterUtils.wrap(tx);
+ TSerializer serializer = new TSerializer();
+ try {
+ return serializer.serialize(thriftTx);
+ } catch (TException te) {
+ throw new IOException(te);
+ }
+ }
+
+ public Transaction decode(byte[] encoded) throws IOException {
+ TTransaction thriftTx = new TTransaction();
+ TDeserializer deserializer = new TDeserializer();
+ try {
+ deserializer.deserialize(thriftTx, encoded);
+ return TransactionConverterUtils.unwrap(thriftTx);
+ } catch (TException te) {
+ throw new IOException(te);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java b/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java
new file mode 100644
index 0000000..22a59c6
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import javax.annotation.Nullable;
+
+/**
+ * Utility class that encapsulates the transaction life cycle over a given set of
+ * transaction-aware datasets. It is not thread-safe for concurrent execution.
+ */
+public class TransactionContext {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TransactionContext.class);
+
+ private final Collection<TransactionAware> txAwares;
+ private final TransactionSystemClient txClient;
+
+ private Transaction currentTx;
+
+ public TransactionContext(TransactionSystemClient txClient, TransactionAware... txAwares) {
+ this(txClient, ImmutableList.copyOf(txAwares));
+ }
+
+ public TransactionContext(TransactionSystemClient txClient, Iterable<TransactionAware> txAwares) {
+ // Use a set to avoid adding the same TransactionAware twice and to make removal faster.
+ // Use a linked hash set so that insertion order is preserved (same behavior as when it was using a List).
+ this.txAwares = Sets.newLinkedHashSet(txAwares);
+ this.txClient = txClient;
+ }
+
+ /**
+ * Adds a new transaction-aware to participate in the transaction.
+ * @param txAware the new transaction-aware
+ */
+ public boolean addTransactionAware(TransactionAware txAware) {
+ // If the txAware is newly added, call startTx as well if there is an active transaction
+ boolean added = txAwares.add(txAware);
+ if (added && currentTx != null) {
+ txAware.startTx(currentTx);
+ }
+ return added;
+ }
+
+ /**
+ * Removes a {@link TransactionAware} and withdraws from participation in the transaction.
+ * Withdrawal is only allowed if there is no active transaction.
+ *
+ * @param txAware the {@link TransactionAware} to be removed
+ * @return true if the given {@link TransactionAware} is removed; false otherwise.
+ * @throws IllegalStateException if there is an active transaction going on with this TransactionContext.
+ */
+ public boolean removeTransactionAware(TransactionAware txAware) {
+ Preconditions.checkState(currentTx == null, "Cannot remove TransactionAware while there is an active transaction.");
+ return txAwares.remove(txAware);
+ }
+
+ /**
+ * Starts a new transaction. Calling this will initiate a new transaction using the {@link TransactionSystemClient},
+ * and pass the returned transaction to {@link TransactionAware#startTx(Transaction)} for each registered
+ * TransactionAware. If an exception is encountered, the transaction will be aborted and a
+ * {@code TransactionFailureException} wrapping the root cause will be thrown.
+ *
+ * @throws TransactionFailureException if an exception occurs starting the transaction with any registered
+ * TransactionAware
+ */
+ public void start() throws TransactionFailureException {
+ currentTx = txClient.startShort();
+ for (TransactionAware txAware : txAwares) {
+ try {
+ txAware.startTx(currentTx);
+ } catch (Throwable e) {
+ String message = String.format("Unable to start transaction-aware '%s' for transaction %d. ",
+ txAware.getTransactionAwareName(), currentTx.getTransactionId());
+ LOG.warn(message, e);
+ txClient.abort(currentTx);
+ throw new TransactionFailureException(message, e);
+ }
+ }
+ }
+
+ /**
+ * Commits the current transaction. This will: check for any conflicts, based on the change set aggregated from
+ * all registered {@link TransactionAware} instances; flush any pending writes from the {@code TransactionAware}s;
+ * commit the current transaction with the {@link TransactionSystemClient}; and clear the current transaction state.
+ *
+ * @throws TransactionConflictException if a conflict is detected with a recently committed transaction
+ * @throws TransactionFailureException if an error occurs while committing
+ */
+ public void finish() throws TransactionFailureException {
+ Preconditions.checkState(currentTx != null, "Cannot finish tx that has not been started");
+ // each of these steps will abort and rollback the tx in case if errors, and throw an exception
+ checkForConflicts();
+ persist();
+ commit();
+ postCommit();
+ currentTx = null;
+ }
+
+ /**
+ * Aborts the given transaction, and rolls back all data set changes. If rollback fails,
+ * the transaction is invalidated. If an exception is caught during rollback, the exception
+ * is rethrown wrapped in a TransactionFailureException, after all remaining TransactionAwares have
+ * completed rollback.
+ *
+ * @throws TransactionFailureException for any exception that is encountered.
+ */
+ public void abort() throws TransactionFailureException {
+ abort(null);
+ }
+
+ /**
+ * Checkpoints the current transaction by flushing any pending writes for the registered {@link TransactionAware}
+ * instances, and obtaining a new current write pointer for the transaction. By performing a checkpoint,
+ * the client can ensure that all previous writes were flushed and are visible. By default, the current write
+ * pointer for the transaction is also visible. The current write pointer can be excluded from read
+ * operations by calling {@link Transaction#setVisibility(Transaction.VisibilityLevel)} with the visibility level set
+ * to {@link Transaction.VisibilityLevel#SNAPSHOT_EXCLUDE_CURRENT} on the {@link Transaction} instance created
+ * by the checkpoint call, which can be retrieved by calling {@link #getCurrentTransaction()}.
+ *
+ * After the checkpoint operation is performed, the updated
+ * {@link Transaction} instance will be passed to {@link TransactionAware#startTx(Transaction)} for each
+ * registered {@code TransactionAware} instance.
+ *
+ * @throws TransactionFailureException if an error occurs while performing the checkpoint
+ */
+ public void checkpoint() throws TransactionFailureException {
+ Preconditions.checkState(currentTx != null, "Cannot checkpoint tx that has not been started");
+ persist();
+ try {
+ currentTx = txClient.checkpoint(currentTx);
+ // update the current transaction with all TransactionAwares
+ for (TransactionAware txAware : txAwares) {
+ txAware.updateTx(currentTx);
+ }
+ } catch (TransactionNotInProgressException e) {
+ String message = String.format("Transaction %d is not in progress.", currentTx.getTransactionId());
+ LOG.warn(message, e);
+ abort(new TransactionFailureException(message, e));
+ // abort will throw that exception
+ } catch (Throwable e) {
+ String message = String.format("Exception from checkpoint for transaction %d.", currentTx.getTransactionId());
+ LOG.warn(message, e);
+ abort(new TransactionFailureException(message, e));
+ // abort will throw that exception
+ }
+ }
+
+ /**
+ * Returns the current transaction or null if no transaction is currently in progress.
+ */
+ @Nullable
+ public Transaction getCurrentTransaction() {
+ return currentTx;
+ }
+
+ // CHECKSTYLE IGNORE "@throws" FOR 11 LINES
+ /**
+ * Aborts the given transaction, and rolls back all data set changes. If rollback fails,
+ * the transaction is invalidated. If an exception is caught during rollback, the exception
+ * is rethrown wrapped into a TransactionFailureException, after all remaining TransactionAwares have
+ * completed rollback. If an existing exception is passed in, that exception is thrown in either
+ * case, whether the rollback is successful or not. In other words, this method always throws the
+ * first exception that it encounters.
+ * @param cause the original exception that caused the abort
+ * @throws TransactionFailureException for any exception that is encountered.
+ */
+ public void abort(TransactionFailureException cause) throws TransactionFailureException {
+ if (currentTx == null) {
+ // might be called by some generic exception handler even though already aborted/finished - we allow that
+ return;
+ }
+ try {
+ boolean success = true;
+ for (TransactionAware txAware : txAwares) {
+ try {
+ if (!txAware.rollbackTx()) {
+ success = false;
+ }
+ } catch (Throwable e) {
+ String message = String.format("Unable to roll back changes in transaction-aware '%s' for transaction %d. ",
+ txAware.getTransactionAwareName(), currentTx.getTransactionId());
+ LOG.warn(message, e);
+ if (cause == null) {
+ cause = new TransactionFailureException(message, e);
+ }
+ success = false;
+ }
+ }
+ if (success) {
+ txClient.abort(currentTx);
+ } else {
+ txClient.invalidate(currentTx.getTransactionId());
+ }
+ if (cause != null) {
+ throw cause;
+ }
+ } finally {
+ currentTx = null;
+ }
+ }
+
+ private void checkForConflicts() throws TransactionFailureException {
+ Collection<byte[]> changes = Lists.newArrayList();
+ for (TransactionAware txAware : txAwares) {
+ try {
+ changes.addAll(txAware.getTxChanges());
+ } catch (Throwable e) {
+ String message = String.format("Unable to retrieve changes from transaction-aware '%s' for transaction %d. ",
+ txAware.getTransactionAwareName(), currentTx.getTransactionId());
+ LOG.warn(message, e);
+ abort(new TransactionFailureException(message, e));
+ // abort will throw that exception
+ }
+ }
+
+ boolean canCommit = false;
+ try {
+ canCommit = txClient.canCommit(currentTx, changes);
+ } catch (TransactionNotInProgressException e) {
+ String message = String.format("Transaction %d is not in progress.", currentTx.getTransactionId());
+ LOG.warn(message, e);
+ abort(new TransactionFailureException(message, e));
+ // abort will throw that exception
+ } catch (Throwable e) {
+ String message = String.format("Exception from canCommit for transaction %d.", currentTx.getTransactionId());
+ LOG.warn(message, e);
+ abort(new TransactionFailureException(message, e));
+ // abort will throw that exception
+ }
+ if (!canCommit) {
+ String message = String.format("Conflict detected for transaction %d.", currentTx.getTransactionId());
+ abort(new TransactionConflictException(message));
+ // abort will throw
+ }
+ }
+
+ private void persist() throws TransactionFailureException {
+ for (TransactionAware txAware : txAwares) {
+ boolean success;
+ Throwable cause = null;
+ try {
+ success = txAware.commitTx();
+ } catch (Throwable e) {
+ success = false;
+ cause = e;
+ }
+ if (!success) {
+ String message = String.format("Unable to persist changes of transaction-aware '%s' for transaction %d. ",
+ txAware.getTransactionAwareName(), currentTx.getTransactionId());
+ if (cause == null) {
+ LOG.warn(message);
+ } else {
+ LOG.warn(message, cause);
+ }
+ abort(new TransactionFailureException(message, cause));
+ // abort will throw that exception
+ }
+ }
+ }
+
+ private void commit() throws TransactionFailureException {
+ boolean commitSuccess = false;
+ try {
+ commitSuccess = txClient.commit(currentTx);
+ } catch (TransactionNotInProgressException e) {
+ String message = String.format("Transaction %d is not in progress.", currentTx.getTransactionId());
+ LOG.warn(message, e);
+ abort(new TransactionFailureException(message, e));
+ // abort will throw that exception
+ } catch (Throwable e) {
+ String message = String.format("Exception from commit for transaction %d.", currentTx.getTransactionId());
+ LOG.warn(message, e);
+ abort(new TransactionFailureException(message, e));
+ // abort will throw that exception
+ }
+ if (!commitSuccess) {
+ String message = String.format("Conflict detected for transaction %d.", currentTx.getTransactionId());
+ abort(new TransactionConflictException(message));
+ // abort will throw
+ }
+ }
+
+ private void postCommit() throws TransactionFailureException {
+ TransactionFailureException cause = null;
+ for (TransactionAware txAware : txAwares) {
+ try {
+ txAware.postTxCommit();
+ } catch (Throwable e) {
+ String message = String.format("Unable to perform post-commit in transaction-aware '%s' for transaction %d. ",
+ txAware.getTransactionAwareName(), currentTx.getTransactionId());
+ LOG.warn(message, e);
+ cause = new TransactionFailureException(message, e);
+ }
+ }
+ if (cause != null) {
+ throw cause;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/TransactionCouldNotTakeSnapshotException.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/TransactionCouldNotTakeSnapshotException.java b/tephra-core/src/main/java/org/apache/tephra/TransactionCouldNotTakeSnapshotException.java
new file mode 100644
index 0000000..3af9596
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/TransactionCouldNotTakeSnapshotException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra;
+
+/**
+ * Throw when taking a snapshot fails.
+ */
+public class TransactionCouldNotTakeSnapshotException extends Exception {
+ public TransactionCouldNotTakeSnapshotException(String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/TransactionExecutor.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/TransactionExecutor.java b/tephra-core/src/main/java/org/apache/tephra/TransactionExecutor.java
new file mode 100644
index 0000000..ed5029e
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/TransactionExecutor.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.concurrent.Callable;
+
+/**
+ * Utility that wraps the execution of a function into the context of a transaction.
+ */
+// todo: implementations should throw different from TransactionFailureException in case of user code error?
+// todo: accept only Callable? Executors util has a way to convert everything to Callable...
+public interface TransactionExecutor {
+
+ /**
+ * A function is a class with a single method that takes an argument and returns a result.
+ * @param <I> the type of the argument
+ * @param <O> the type of the result
+ */
+ public interface Function<I, O> {
+ O apply(I input) throws Exception;
+ }
+
+ /**
+ * A procedure is a class with a single void method that takes an argument.
+ * @param <I> the type of the argument
+ */
+ public interface Procedure<I> {
+ void apply(I input) throws Exception;
+ }
+
+ /**
+ * A subroutine is a class with a single void method without arguments.
+ */
+ public interface Subroutine {
+ void apply() throws Exception;
+ }
+
+ // Due to a bug in checkstyle, it would emit false positives here of the form
+ // "Unable to get class information for @throws tag '<exn>' (...)".
+ // This comment disables that check up to the corresponding ON comments below
+
+ // CHECKSTYLE OFF: @throws
+
+ /**
+ * Execute a function under transactional semantics. A transaction is started and all datasets
+ * are initialized with the transaction. Then the passed function is executed, the transaction
+ * is committed, and the function return value is returned as the return value of this method.
+ * If any exception is caught, the transaction is aborted and the original exception is rethrown,
+ * wrapped into a TransactionFailureException. If the transaction fails due to a write conflict,
+ * a TransactionConflictException is thrown.
+ * @param function the function to execute
+ * @param input the input parameter for the function
+ * @param <I> the input type of the function
+ * @param <O> the result type of the function
+ * @return the function's return value
+ * @throws TransactionConflictException if there is a write conflict with another transaction.
+ * @throws TransactionFailureException if any exception is caught, be it from the function or from the datasets.
+ */
+ <I, O> O execute(Function<I, O> function, I input) throws TransactionFailureException, InterruptedException;
+
+ // CHECKSTYLE ON
+
+ /**
+ * Like {@link #execute(Function, Object)} but without a return value.
+ */
+ <I> void execute(Procedure<I> procedure, I input) throws TransactionFailureException, InterruptedException;
+
+ /**
+ * Like {@link #execute(Function, Object)} but the callable has no argument.
+ */
+ <O> O execute(Callable<O> callable) throws TransactionFailureException, InterruptedException;
+
+ /**
+ * Like {@link #execute(Function, Object)} but without argument or return value.
+ */
+ void execute(Subroutine subroutine) throws TransactionFailureException, InterruptedException;
+
+ /**
+ * Same as {@link #execute(Function, Object)} but
+ * suppresses exception with {@link com.google.common.base.Throwables#propagate(Throwable)}
+ */
+ <I, O> O executeUnchecked(Function<I, O> function, I input);
+
+ /**
+ * Same as {@link #execute(Procedure, Object)} but
+ * suppresses exception with {@link com.google.common.base.Throwables#propagate(Throwable)}
+ */
+ <I> void executeUnchecked(Procedure<I> procedure, I input);
+
+ /**
+ * Same as {@link #execute(Callable)} but
+ * suppresses exception with {@link com.google.common.base.Throwables#propagate(Throwable)}
+ */
+ <O> O executeUnchecked(Callable<O> callable);
+
+ /**
+ * Same as {@link #execute(Subroutine)} but
+ * suppresses exception with {@link com.google.common.base.Throwables#propagate(Throwable)}
+ */
+ void executeUnchecked(Subroutine subroutine);
+
+ /**
+ * Same as {@link #execute(Function, Object)} but executes asynchronously
+ */
+ <I, O> ListenableFuture<O> submit(Function<I, O> function, I input);
+
+ /**
+ * Same as {@link #execute(Procedure, Object)} but executes asynchronously
+ */
+ <I> ListenableFuture<?> submit(Procedure<I> procedure, I input);
+
+ /**
+ * Same as {@link #execute(Callable)} but executes asynchronously
+ */
+ <O> ListenableFuture<O> submit(Callable<O> callable);
+
+ /**
+ * Same as {@link #execute(Subroutine)} but executes asynchronously
+ */
+ ListenableFuture<?> submit(Subroutine subroutine);
+
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/TransactionExecutorFactory.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/TransactionExecutorFactory.java b/tephra-core/src/main/java/org/apache/tephra/TransactionExecutorFactory.java
new file mode 100644
index 0000000..afe0f33
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/TransactionExecutorFactory.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra;
+
+/**
+ * A factory for transaction executors.
+ */
+public interface TransactionExecutorFactory {
+
+ TransactionExecutor createExecutor(Iterable<TransactionAware> txAwares);
+
+}