You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by po...@apache.org on 2016/05/11 20:15:40 UTC

[40/56] [abbrv] [partial] incubator-tephra git commit: Rename package to org.apache.tephra

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/RetryStrategy.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/RetryStrategy.java b/tephra-core/src/main/java/org/apache/tephra/RetryStrategy.java
new file mode 100644
index 0000000..4c33538
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/RetryStrategy.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra;
+
+/**
+ * Retry strategy for failed transactions
+ */
+public interface RetryStrategy {
+  /**
+   * Returns the number of milliseconds to wait before retrying the operation.
+   *
+   * @param reason Reason for transaction failure.
+   * @param failureCount Number of times that the request has been failed.
+   * @return Number of milliseconds to wait before retrying the operation. Returning {@code 0} means
+   *         retry it immediately, while negative means abort the operation.
+   */
+  long nextRetry(TransactionFailureException reason, int failureCount);
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/TransactionAdmin.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/TransactionAdmin.java b/tephra-core/src/main/java/org/apache/tephra/TransactionAdmin.java
new file mode 100644
index 0000000..e5902a7
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/TransactionAdmin.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tephra.runtime.ConfigModule;
+import org.apache.tephra.runtime.DiscoveryModules;
+import org.apache.tephra.runtime.TransactionClientModule;
+import org.apache.tephra.runtime.TransactionModules;
+import org.apache.tephra.runtime.ZKModule;
+import org.apache.twill.zookeeper.ZKClientService;
+
+import java.io.PrintStream;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Allows calling some methods on {@link TransactionManager} from command line.
+ */
+public class TransactionAdmin {
+  private static final String OPT_TRUNCATE_INVALID_TX = "--truncate-invalid-tx";
+  private static final String OPT_TRUNCATE_INVALID_TX_BEFORE = "--truncate-invalid-tx-before";
+  private static final String OPT_GET_INVALID_TX_SIZE = "--get-invalid-tx-size";
+  
+  private final PrintStream out;
+  private final PrintStream err;
+
+  public static void main(String[] args) {
+    TransactionAdmin txAdmin = new TransactionAdmin(System.out, System.err);
+    int status = txAdmin.doMain(args, new Configuration());
+    System.exit(status);
+  }
+
+  public TransactionAdmin(PrintStream out, PrintStream err) {
+    this.out = out;
+    this.err = err;
+  }
+
+  @VisibleForTesting
+  int doMain(String[] args, Configuration conf) {
+    if (args.length < 1) {
+      printUsage();
+      return 1;
+    }
+
+    Injector injector = Guice.createInjector(
+      new ConfigModule(conf),
+      new ZKModule(),
+      new DiscoveryModules().getDistributedModules(),
+      new TransactionModules().getDistributedModules(),
+      new TransactionClientModule()
+    );
+
+    ZKClientService zkClient = injector.getInstance(ZKClientService.class);
+    zkClient.startAndWait();
+    
+    try {
+      TransactionSystemClient txClient = injector.getInstance(TransactionSystemClient.class);
+      String option = args[0];
+      
+      if (option.equals(OPT_TRUNCATE_INVALID_TX)) {
+        if (args.length != 2) {
+          printUsage();
+          return 1;
+        }
+        Set<Long> txIds;
+        try {
+          txIds = parseTxIds(args[1]);
+        } catch (NumberFormatException e) {
+          err.println("NumberFormatException: " + e.getMessage());
+          return 1;
+        }
+        if (!txIds.isEmpty()) {
+          out.println("Invalid list size before truncation: " + txClient.getInvalidSize());
+          txClient.truncateInvalidTx(txIds);
+          out.println("Invalid list size after truncation: " + txClient.getInvalidSize());
+        }
+      } else if (option.equals(OPT_TRUNCATE_INVALID_TX_BEFORE)) {
+        if (args.length != 2) {
+          printUsage();
+          return 1;
+        }
+        try {
+          long time = Long.parseLong(args[1]);
+          out.println("Invalid list size before truncation: " + txClient.getInvalidSize());
+          txClient.truncateInvalidTxBefore(time);
+          out.println("Invalid list size after truncation: " + txClient.getInvalidSize());
+        } catch (InvalidTruncateTimeException e) {
+          err.println(e.getMessage());
+          return 1;
+        } catch (NumberFormatException e) {
+          err.println("NumberFormatException: " + e.getMessage());
+          return 1;
+        }
+      } else if (option.equals(OPT_GET_INVALID_TX_SIZE)) {
+        if (args.length != 1) {
+          printUsage();
+          return 1;
+        }
+        out.println("Invalid list size: " + txClient.getInvalidSize());
+      } else {
+        printUsage();
+        return 1;
+      }
+    } finally {
+      zkClient.stopAndWait();
+    }
+    return 0;
+  }
+  
+  private Set<Long> parseTxIds(String option) throws NumberFormatException {
+    Set<Long> txIds = Sets.newHashSet();
+    for (String str : Splitter.on(',').split(option)) {
+      txIds.add(Long.parseLong(str));
+    }
+    return txIds;
+  }
+  
+  private void printUsage() {
+    String programName = TransactionAdmin.class.getSimpleName();
+    String spaces = "     ";
+    List<String> options = Lists.newArrayList();
+    options.add(join("Usage: "));
+    options.add(join(spaces, programName, OPT_TRUNCATE_INVALID_TX, "<tx1,tx2,...>"));
+    options.add(join(spaces, programName, OPT_TRUNCATE_INVALID_TX_BEFORE, "<time in secs>"));
+    options.add(join(spaces, programName, OPT_GET_INVALID_TX_SIZE));
+    
+    String usage = Joiner.on(System.getProperty("line.separator")).join(options);
+    err.println(usage);
+  }
+  
+  private static String join(String... args) {
+    return Joiner.on(" ").join(args);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/TransactionCodec.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/TransactionCodec.java b/tephra-core/src/main/java/org/apache/tephra/TransactionCodec.java
new file mode 100644
index 0000000..c147917
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/TransactionCodec.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra;
+
+import org.apache.tephra.distributed.TransactionConverterUtils;
+import org.apache.tephra.distributed.thrift.TTransaction;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+
+import java.io.IOException;
+
+/**
+ * Handles serialization and deserialization of {@link Transaction} instances to and from {@code byte[]}.
+ */
+public class TransactionCodec {
+
+  public TransactionCodec() {
+  }
+
+  public byte[] encode(Transaction tx) throws IOException {
+    TTransaction thriftTx = TransactionConverterUtils.wrap(tx);
+    TSerializer serializer = new TSerializer();
+    try {
+      return serializer.serialize(thriftTx);
+    } catch (TException te) {
+      throw new IOException(te);
+    }
+  }
+
+  public Transaction decode(byte[] encoded) throws IOException {
+    TTransaction thriftTx = new TTransaction();
+    TDeserializer deserializer = new TDeserializer();
+    try {
+      deserializer.deserialize(thriftTx, encoded);
+      return TransactionConverterUtils.unwrap(thriftTx);
+    } catch (TException te) {
+      throw new IOException(te);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java b/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java
new file mode 100644
index 0000000..22a59c6
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import javax.annotation.Nullable;
+
+/**
+ * Utility class that encapsulates the transaction life cycle over a given set of
+ * transaction-aware datasets. It is not thread-safe for concurrent execution.
+ */
+public class TransactionContext {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TransactionContext.class);
+
+  private final Collection<TransactionAware> txAwares;
+  private final TransactionSystemClient txClient;
+
+  private Transaction currentTx;
+
+  public TransactionContext(TransactionSystemClient txClient, TransactionAware... txAwares) {
+    this(txClient, ImmutableList.copyOf(txAwares));
+  }
+
+  public TransactionContext(TransactionSystemClient txClient, Iterable<TransactionAware> txAwares) {
+    // Use a set to avoid adding the same TransactionAware twice and to make removal faster.
+    // Use a linked hash set so that insertion order is preserved (same behavior as when it was using a List).
+    this.txAwares = Sets.newLinkedHashSet(txAwares);
+    this.txClient = txClient;
+  }
+
+  /**
+   * Adds a new transaction-aware to participate in the transaction.
+   * @param txAware the new transaction-aware
+   */
+  public boolean addTransactionAware(TransactionAware txAware) {
+    // If the txAware is newly added, call startTx as well if there is an active transaction
+    boolean added = txAwares.add(txAware);
+    if (added && currentTx != null) {
+      txAware.startTx(currentTx);
+    }
+    return added;
+  }
+
+  /**
+   * Removes a {@link TransactionAware} and withdraws from participation in the transaction.
+   * Withdrawal is only allowed if there is no active transaction.
+   *
+   * @param txAware the {@link TransactionAware} to be removed
+   * @return true if the given {@link TransactionAware} is removed; false otherwise.
+   * @throws IllegalStateException if there is an active transaction going on with this TransactionContext.
+   */
+  public boolean removeTransactionAware(TransactionAware txAware) {
+    Preconditions.checkState(currentTx == null, "Cannot remove TransactionAware while there is an active transaction.");
+    return txAwares.remove(txAware);
+  }
+
+  /**
+   * Starts a new transaction.  Calling this will initiate a new transaction using the {@link TransactionSystemClient},
+   * and pass the returned transaction to {@link TransactionAware#startTx(Transaction)} for each registered
+   * TransactionAware.  If an exception is encountered, the transaction will be aborted and a
+   * {@code TransactionFailureException} wrapping the root cause will be thrown.
+   *
+   * @throws TransactionFailureException if an exception occurs starting the transaction with any registered
+   *     TransactionAware
+   */
+  public void start() throws TransactionFailureException {
+    currentTx = txClient.startShort();
+    for (TransactionAware txAware : txAwares) {
+      try {
+        txAware.startTx(currentTx);
+      } catch (Throwable e) {
+        String message = String.format("Unable to start transaction-aware '%s' for transaction %d. ",
+                                       txAware.getTransactionAwareName(), currentTx.getTransactionId());
+        LOG.warn(message, e);
+        txClient.abort(currentTx);
+        throw new TransactionFailureException(message, e);
+      }
+    }
+  }
+
+  /**
+   * Commits the current transaction.  This will: check for any conflicts, based on the change set aggregated from
+   * all registered {@link TransactionAware} instances; flush any pending writes from the {@code TransactionAware}s;
+   * commit the current transaction with the {@link TransactionSystemClient}; and clear the current transaction state.
+   *
+   * @throws TransactionConflictException if a conflict is detected with a recently committed transaction
+   * @throws TransactionFailureException if an error occurs while committing
+   */
+  public void finish() throws TransactionFailureException {
+    Preconditions.checkState(currentTx != null, "Cannot finish tx that has not been started");
+    // each of these steps will abort and rollback the tx in case if errors, and throw an exception
+    checkForConflicts();
+    persist();
+    commit();
+    postCommit();
+    currentTx = null;
+  }
+
+  /**
+   * Aborts the given transaction, and rolls back all data set changes. If rollback fails,
+   * the transaction is invalidated. If an exception is caught during rollback, the exception
+   * is rethrown wrapped in a TransactionFailureException, after all remaining TransactionAwares have
+   * completed rollback.
+   *
+   * @throws TransactionFailureException for any exception that is encountered.
+   */
+  public void abort() throws TransactionFailureException {
+    abort(null);
+  }
+
+  /**
+   * Checkpoints the current transaction by flushing any pending writes for the registered {@link TransactionAware}
+   * instances, and obtaining a new current write pointer for the transaction.  By performing a checkpoint,
+   * the client can ensure that all previous writes were flushed and are visible.  By default, the current write
+   * pointer for the transaction is also visible.  The current write pointer can be excluded from read
+   * operations by calling {@link Transaction#setVisibility(Transaction.VisibilityLevel)} with the visibility level set
+   * to {@link Transaction.VisibilityLevel#SNAPSHOT_EXCLUDE_CURRENT} on the {@link Transaction} instance created
+   * by the checkpoint call, which can be retrieved by calling {@link #getCurrentTransaction()}.
+   *
+   * After the checkpoint operation is performed, the updated
+   * {@link Transaction} instance will be passed to {@link TransactionAware#startTx(Transaction)} for each
+   * registered {@code TransactionAware} instance.
+   *
+   * @throws TransactionFailureException if an error occurs while performing the checkpoint
+   */
+  public void checkpoint() throws TransactionFailureException {
+    Preconditions.checkState(currentTx != null, "Cannot checkpoint tx that has not been started");
+    persist();
+    try {
+      currentTx = txClient.checkpoint(currentTx);
+      // update the current transaction with all TransactionAwares
+      for (TransactionAware txAware : txAwares) {
+        txAware.updateTx(currentTx);
+      }
+    } catch (TransactionNotInProgressException e) {
+      String message = String.format("Transaction %d is not in progress.", currentTx.getTransactionId());
+      LOG.warn(message, e);
+      abort(new TransactionFailureException(message, e));
+      // abort will throw that exception
+    } catch (Throwable e) {
+      String message = String.format("Exception from checkpoint for transaction %d.", currentTx.getTransactionId());
+      LOG.warn(message, e);
+      abort(new TransactionFailureException(message, e));
+      // abort will throw that exception
+    }
+  }
+
+  /**
+   * Returns the current transaction or null if no transaction is currently in progress.
+   */
+  @Nullable
+  public Transaction getCurrentTransaction() {
+    return currentTx;
+  }
+
+  // CHECKSTYLE IGNORE "@throws" FOR 11 LINES
+  /**
+   * Aborts the given transaction, and rolls back all data set changes. If rollback fails,
+   * the transaction is invalidated. If an exception is caught during rollback, the exception
+   * is rethrown wrapped into a TransactionFailureException, after all remaining TransactionAwares have
+   * completed rollback. If an existing exception is passed in, that exception is thrown in either
+   * case, whether the rollback is successful or not. In other words, this method always throws the
+   * first exception that it encounters.
+   * @param cause the original exception that caused the abort
+   * @throws TransactionFailureException for any exception that is encountered.
+   */
+  public void abort(TransactionFailureException cause) throws TransactionFailureException {
+    if (currentTx == null) {
+      // might be called by some generic exception handler even though already aborted/finished - we allow that
+      return;
+    }
+    try {
+      boolean success = true;
+      for (TransactionAware txAware : txAwares) {
+        try {
+          if (!txAware.rollbackTx()) {
+            success = false;
+          }
+        } catch (Throwable e) {
+          String message = String.format("Unable to roll back changes in transaction-aware '%s' for transaction %d. ",
+                                         txAware.getTransactionAwareName(), currentTx.getTransactionId());
+          LOG.warn(message, e);
+          if (cause == null) {
+            cause = new TransactionFailureException(message, e);
+          }
+          success = false;
+        }
+      }
+      if (success) {
+        txClient.abort(currentTx);
+      } else {
+        txClient.invalidate(currentTx.getTransactionId());
+      }
+      if (cause != null) {
+        throw cause;
+      }
+    } finally {
+      currentTx = null;
+    }
+  }
+
+  private void checkForConflicts() throws TransactionFailureException {
+    Collection<byte[]> changes = Lists.newArrayList();
+    for (TransactionAware txAware : txAwares) {
+      try {
+        changes.addAll(txAware.getTxChanges());
+      } catch (Throwable e) {
+        String message = String.format("Unable to retrieve changes from transaction-aware '%s' for transaction %d. ",
+                                       txAware.getTransactionAwareName(), currentTx.getTransactionId());
+        LOG.warn(message, e);
+        abort(new TransactionFailureException(message, e));
+        // abort will throw that exception
+      }
+    }
+
+    boolean canCommit = false;
+    try {
+      canCommit = txClient.canCommit(currentTx, changes);
+    } catch (TransactionNotInProgressException e) {
+      String message = String.format("Transaction %d is not in progress.", currentTx.getTransactionId());
+      LOG.warn(message, e);
+      abort(new TransactionFailureException(message, e));
+      // abort will throw that exception
+    } catch (Throwable e) {
+      String message = String.format("Exception from canCommit for transaction %d.", currentTx.getTransactionId());
+      LOG.warn(message, e);
+      abort(new TransactionFailureException(message, e));
+      // abort will throw that exception
+    }
+    if (!canCommit) {
+      String message = String.format("Conflict detected for transaction %d.", currentTx.getTransactionId());
+      abort(new TransactionConflictException(message));
+      // abort will throw
+    }
+  }
+
+  private void persist() throws TransactionFailureException {
+    for (TransactionAware txAware : txAwares) {
+      boolean success;
+      Throwable cause = null;
+      try {
+        success = txAware.commitTx();
+      } catch (Throwable e) {
+        success = false;
+        cause = e;
+      }
+      if (!success) {
+        String message = String.format("Unable to persist changes of transaction-aware '%s' for transaction %d. ",
+                                       txAware.getTransactionAwareName(), currentTx.getTransactionId());
+        if (cause == null) {
+          LOG.warn(message);
+        } else {
+          LOG.warn(message, cause);
+        }
+        abort(new TransactionFailureException(message, cause));
+        // abort will throw that exception
+      }
+    }
+  }
+
+  private void commit() throws TransactionFailureException {
+    boolean commitSuccess = false;
+    try {
+      commitSuccess = txClient.commit(currentTx);
+    } catch (TransactionNotInProgressException e) {
+      String message = String.format("Transaction %d is not in progress.", currentTx.getTransactionId());
+      LOG.warn(message, e);
+      abort(new TransactionFailureException(message, e));
+      // abort will throw that exception
+    } catch (Throwable e) {
+      String message = String.format("Exception from commit for transaction %d.", currentTx.getTransactionId());
+      LOG.warn(message, e);
+      abort(new TransactionFailureException(message, e));
+      // abort will throw that exception
+    }
+    if (!commitSuccess) {
+      String message = String.format("Conflict detected for transaction %d.", currentTx.getTransactionId());
+      abort(new TransactionConflictException(message));
+      // abort will throw
+    }
+  }
+
+  private void postCommit() throws TransactionFailureException {
+    TransactionFailureException cause = null;
+    for (TransactionAware txAware : txAwares) {
+      try {
+        txAware.postTxCommit();
+      } catch (Throwable e) {
+        String message = String.format("Unable to perform post-commit in transaction-aware '%s' for transaction %d. ",
+                                       txAware.getTransactionAwareName(), currentTx.getTransactionId());
+        LOG.warn(message, e);
+        cause = new TransactionFailureException(message, e);
+      }
+    }
+    if (cause != null) {
+      throw cause;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/TransactionCouldNotTakeSnapshotException.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/TransactionCouldNotTakeSnapshotException.java b/tephra-core/src/main/java/org/apache/tephra/TransactionCouldNotTakeSnapshotException.java
new file mode 100644
index 0000000..3af9596
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/TransactionCouldNotTakeSnapshotException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra;
+
+/**
+ * Throw when taking a snapshot fails.
+ */
+public class TransactionCouldNotTakeSnapshotException extends Exception {
+  public TransactionCouldNotTakeSnapshotException(String message) {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/TransactionExecutor.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/TransactionExecutor.java b/tephra-core/src/main/java/org/apache/tephra/TransactionExecutor.java
new file mode 100644
index 0000000..ed5029e
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/TransactionExecutor.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.concurrent.Callable;
+
+/**
+ * Utility that wraps the execution of a function into the context of a transaction.
+ */
+// todo: implementations should throw different from TransactionFailureException in case of user code error?
+// todo: accept only Callable? Executors util has a way to convert everything to Callable...
+public interface TransactionExecutor {
+
+  /**
+   * A function is a class with a single method that takes an argument and returns a result.
+   * @param <I> the type of the argument
+   * @param <O> the type of the result
+   */
+  public interface Function<I, O> {
+    O apply(I input) throws Exception;
+  }
+
+  /**
+   * A procedure is a class with a single void method that takes an argument.
+   * @param <I> the type of the argument
+   */
+  public interface Procedure<I> {
+    void apply(I input) throws Exception;
+  }
+
+  /**
+   * A subroutine is a class with a single void method without arguments.
+   */
+  public interface Subroutine {
+    void apply() throws Exception;
+  }
+
+  // Due to a bug in checkstyle, it would emit false positives here of the form
+  // "Unable to get class information for @throws tag '<exn>' (...)".
+  // This comment disables that check up to the corresponding ON comments below
+
+  // CHECKSTYLE OFF: @throws
+
+  /**
+   * Execute a function under transactional semantics. A transaction is started  and all datasets
+   * are initialized with the transaction. Then the passed function is executed, the transaction
+   * is committed, and the function return value is returned as the return value of this method.
+   * If any exception is caught, the transaction is aborted and the original exception is rethrown,
+   * wrapped into a TransactionFailureException. If the transaction fails due to a write conflict,
+   * a TransactionConflictException is thrown.
+   * @param function the function to execute
+   * @param input the input parameter for the function
+   * @param <I> the input type of the function
+   * @param <O> the result type of the function
+   * @return the function's return value
+   * @throws TransactionConflictException if there is a write conflict with another transaction.
+   * @throws TransactionFailureException if any exception is caught, be it from the function or from the datasets.
+   */
+  <I, O> O execute(Function<I, O> function, I input) throws TransactionFailureException, InterruptedException;
+
+  // CHECKSTYLE ON
+
+  /**
+   * Like {@link #execute(Function, Object)} but without a return value.
+   */
+  <I> void execute(Procedure<I> procedure, I input) throws TransactionFailureException, InterruptedException;
+
+  /**
+   * Like {@link #execute(Function, Object)} but the callable has no argument.
+   */
+  <O> O execute(Callable<O> callable) throws TransactionFailureException, InterruptedException;
+
+  /**
+   * Like {@link #execute(Function, Object)} but without argument or return value.
+   */
+  void execute(Subroutine subroutine) throws TransactionFailureException, InterruptedException;
+
+  /**
+   * Same as {@link #execute(Function, Object)} but
+   * suppresses exception with {@link com.google.common.base.Throwables#propagate(Throwable)}
+   */
+  <I, O> O executeUnchecked(Function<I, O> function, I input);
+
+  /**
+   * Same as {@link #execute(Procedure, Object)} but
+   * suppresses exception with {@link com.google.common.base.Throwables#propagate(Throwable)}
+   */
+  <I> void executeUnchecked(Procedure<I> procedure, I input);
+
+  /**
+   * Same as {@link #execute(Callable)} but
+   * suppresses exception with {@link com.google.common.base.Throwables#propagate(Throwable)}
+   */
+  <O> O executeUnchecked(Callable<O> callable);
+
+  /**
+   * Same as {@link #execute(Subroutine)} but
+   * suppresses exception with {@link com.google.common.base.Throwables#propagate(Throwable)}
+   */
+  void executeUnchecked(Subroutine subroutine);
+
+  /**
+   * Same as {@link #execute(Function, Object)} but executes asynchronously
+   */
+  <I, O> ListenableFuture<O> submit(Function<I, O> function, I input);
+
+  /**
+   * Same as {@link #execute(Procedure, Object)} but executes asynchronously
+   */
+  <I> ListenableFuture<?> submit(Procedure<I> procedure, I input);
+
+  /**
+   * Same as {@link #execute(Callable)} but executes asynchronously
+   */
+  <O> ListenableFuture<O> submit(Callable<O> callable);
+
+  /**
+   * Same as {@link #execute(Subroutine)} but executes asynchronously
+   */
+  ListenableFuture<?> submit(Subroutine subroutine);
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/TransactionExecutorFactory.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/TransactionExecutorFactory.java b/tephra-core/src/main/java/org/apache/tephra/TransactionExecutorFactory.java
new file mode 100644
index 0000000..afe0f33
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/TransactionExecutorFactory.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra;
+
+/**
+ * A factory for transaction executors.
+ */
+public interface TransactionExecutorFactory {
+
+  TransactionExecutor createExecutor(Iterable<TransactionAware> txAwares);
+
+}