You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by an...@apache.org on 2017/09/09 07:12:26 UTC

[2/2] incubator-tephra git commit: (TEPHRA-241) Add a way to limit the size of a transaction

(TEPHRA-241) Add a way to limit the size of a transaction

This closes #48

Signed-off-by: anew <an...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/ae6ce2b5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/ae6ce2b5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/ae6ce2b5

Branch: refs/heads/master
Commit: ae6ce2b5e83eef3ef50f7b025b2ff666d539e391
Parents: 8532076
Author: anew <an...@apache.org>
Authored: Thu Aug 31 14:43:32 2017 -0700
Committer: anew <an...@apache.org>
Committed: Sat Sep 9 00:11:47 2017 -0700

----------------------------------------------------------------------
 .../apache/tephra/TransactionSizeException.java |   28 +
 .../org/apache/tephra/TransactionContext.java   |    9 +-
 .../org/apache/tephra/TransactionManager.java   |   72 +-
 .../apache/tephra/TransactionSystemClient.java  |   22 +
 .../java/org/apache/tephra/TxConstants.java     |   17 +
 .../distributed/TransactionServiceClient.java   |   63 +-
 .../TransactionServiceThriftClient.java         |   22 +-
 .../TransactionServiceThriftHandler.java        |   21 +
 .../distributed/thrift/TTransactionServer.java  | 1298 +++++++++++++++++-
 .../tephra/inmemory/DetachedTxSystemClient.java |    5 +
 .../tephra/inmemory/InMemoryTxSystemClient.java |   11 +
 .../tephra/inmemory/MinimalTxSystemClient.java  |    5 +
 tephra-core/src/main/thrift/transaction.thrift  |    2 +
 .../tephra/ThriftTransactionSystemTest.java     |    4 +-
 .../apache/tephra/TransactionContextTest.java   |    5 +
 .../apache/tephra/TransactionExecutorTest.java  |    5 +
 .../apache/tephra/TransactionManagerTest.java   |  121 +-
 .../apache/tephra/TransactionSystemTest.java    |  189 ++-
 .../tephra/snapshot/SnapshotCodecTest.java      |    7 +-
 19 files changed, 1702 insertions(+), 204 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae6ce2b5/tephra-api/src/main/java/org/apache/tephra/TransactionSizeException.java
----------------------------------------------------------------------
diff --git a/tephra-api/src/main/java/org/apache/tephra/TransactionSizeException.java b/tephra-api/src/main/java/org/apache/tephra/TransactionSizeException.java
new file mode 100644
index 0000000..3ea040f
--- /dev/null
+++ b/tephra-api/src/main/java/org/apache/tephra/TransactionSizeException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra;
+
+/**
+ * Thrown to indicate that a transaction's change set exceeds the allowed size.
+ */
+public class TransactionSizeException extends TransactionFailureException {
+  public TransactionSizeException(String message) {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae6ce2b5/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java b/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java
index 0806294..8b4e4fd 100644
--- a/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java
+++ b/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java
@@ -270,15 +270,12 @@ public class TransactionContext {
 
     boolean canCommit = false;
     try {
-      canCommit = txClient.canCommit(currentTx, changes);
-    } catch (TransactionNotInProgressException e) {
-      String message = String.format("Transaction %d is not in progress.", currentTx.getTransactionId());
-      LOG.warn(message, e);
-      abort(new TransactionFailureException(message, e));
+      canCommit = txClient.canCommitOrThrow(currentTx, changes);
+    } catch (TransactionNotInProgressException | TransactionSizeException e) {
+      throw e;
       // abort will throw that exception
     } catch (Throwable e) {
       String message = String.format("Exception from canCommit for transaction %d.", currentTx.getTransactionId());
-      LOG.warn(message, e);
       abort(new TransactionFailureException(message, e));
       // abort will throw that exception
     }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae6ce2b5/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java b/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java
index 3f332ad..4479812 100644
--- a/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java
+++ b/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java
@@ -162,6 +162,11 @@ public class TransactionManager extends AbstractService {
   private final Lock logReadLock = logLock.readLock();
   private final Lock logWriteLock = logLock.writeLock();
 
+  private final int changeSetCountLimit;
+  private final int changeSetCountThreshold;
+  private final long changeSetSizeLimit;
+  private final long changeSetSizeThreshold;
+
   // fudge factor (in milliseconds) used when interpreting transactions as LONG based on expiration
   // TODO: REMOVE WITH txnBackwardsCompatCheck()
   private final long longTimeoutTolerance;
@@ -188,6 +193,15 @@ public class TransactionManager extends AbstractService {
     snapshotRetainCount = Math.max(conf.getInt(TxConstants.Manager.CFG_TX_SNAPSHOT_RETAIN,
                                                TxConstants.Manager.DEFAULT_TX_SNAPSHOT_RETAIN), 1);
 
+    changeSetCountLimit = conf.getInt(TxConstants.Manager.CFG_TX_CHANGESET_COUNT_LIMIT,
+                                      TxConstants.Manager.DEFAULT_TX_CHANGESET_COUNT_LIMIT);
+    changeSetCountThreshold = conf.getInt(TxConstants.Manager.CFG_TX_CHANGESET_COUNT_WARN_THRESHOLD,
+                                          TxConstants.Manager.DEFAULT_TX_CHANGESET_COUNT_WARN_THRESHOLD);
+    changeSetSizeLimit = conf.getLong(TxConstants.Manager.CFG_TX_CHANGESET_SIZE_LIMIT,
+                                      TxConstants.Manager.DEFAULT_TX_CHANGESET_SIZE_LIMIT);
+    changeSetSizeThreshold = conf.getLong(TxConstants.Manager.CFG_TX_CHANGESET_SIZE_WARN_THRESHOLD,
+                                          TxConstants.Manager.DEFAULT_TX_CHANGESET_SIZE_WARN_THRESHOLD);
+
     // intentionally not using a constant, as this config should not be exposed
     // TODO: REMOVE WITH txnBackwardsCompatCheck()
     longTimeoutTolerance = conf.getLong("data.tx.long.timeout.tolerance", 10000);
@@ -839,10 +853,13 @@ public class TransactionManager extends AbstractService {
     }
   }
 
-  public boolean canCommit(Transaction tx, Collection<byte[]> changeIds) throws TransactionNotInProgressException {
+  public boolean canCommit(Transaction tx, Collection<byte[]> changeIds)
+    throws TransactionNotInProgressException, TransactionSizeException {
+
     txMetricsCollector.rate("canCommit");
     Stopwatch timer = new Stopwatch().start();
-    if (inProgress.get(tx.getTransactionId()) == null) {
+    InProgressTx inProgressTx = inProgress.get(tx.getTransactionId());
+    if (inProgressTx == null) {
       synchronized (this) {
         // invalid transaction, either this has timed out and moved to invalid, or something else is wrong.
         if (invalidTxList.contains(tx.getTransactionId())) {
@@ -857,10 +874,8 @@ public class TransactionManager extends AbstractService {
       }
     }
 
-    Set<ChangeId> set = Sets.newHashSetWithExpectedSize(changeIds.size());
-    for (byte[] change : changeIds) {
-      set.add(new ChangeId(change));
-    }
+    Set<ChangeId> set =
+      validateChangeSet(tx, changeIds, inProgressTx.clientId != null ? inProgressTx.clientId : DEFAULT_CLIENTID);
 
     if (hasConflicts(tx, set)) {
       return false;
@@ -880,6 +895,51 @@ public class TransactionManager extends AbstractService {
     return true;
   }
 
+  /**
+   * Validate the number of changes and the total size of changes. Log a warning if either of them exceeds the
+   * configured threshold, or log a warning and throw an exception if it exceeds the configured limit.
+   *
+   * We log here because application developers may ignore warnings. Logging here gives us a single point
+   * (the tx manager log) to identify all clients that send excessively large change sets.
+   *
+   * @return the same set of changes, transformed into a set of {@link ChangeId}s.
+   * @throws TransactionSizeException if the number or total size of the changes exceed the limit.
+   */
+  private Set<ChangeId> validateChangeSet(Transaction tx, Collection<byte[]> changeIds,
+                                          String clientId) throws TransactionSizeException {
+    if (changeIds.size() > changeSetCountLimit) {
+      LOG.warn("Change set for transaction {} belonging to client '{}' has {} entries and exceeds " +
+                 "the allowed size of {}. Limit the number of changes, or use a long-running transaction. ",
+               tx.getTransactionId(), clientId, changeIds.size(), changeSetCountLimit);
+      throw new TransactionSizeException(String.format(
+        "Change set for transaction %d has %d entries and exceeds the limit of %d",
+        tx.getTransactionId(), changeIds.size(), changeSetCountLimit));
+    } else if (changeIds.size() > changeSetCountThreshold) {
+      LOG.warn("Change set for transaction {} belonging to client '{}' has {} entries. " +
+                 "It is recommended to limit the number of changes to {}, or to use a long-running transaction. ",
+               tx.getTransactionId(), clientId, changeIds.size(), changeSetCountThreshold);
+    }
+    long byteCount = 0L;
+    Set<ChangeId> set = Sets.newHashSetWithExpectedSize(changeIds.size());
+    for (byte[] change : changeIds) {
+      set.add(new ChangeId(change));
+      byteCount += change.length;
+    }
+    if (byteCount > changeSetSizeLimit) {
+      LOG.warn("Change set for transaction {} belonging to client '{}' has total size of {} bytes and exceeds " +
+                 "the allowed size of {} bytes. Limit the total size of changes, or use a long-running transaction. ",
+               tx.getTransactionId(), clientId, byteCount, changeSetSizeLimit);
+      throw new TransactionSizeException(String.format(
+        "Change set for transaction %d has total size of %d bytes and exceeds the limit of %d bytes",
+        tx.getTransactionId(), byteCount, changeSetSizeLimit));
+    } else if (byteCount > changeSetSizeThreshold) {
+      LOG.warn("Change set for transaction {} belonging to client '{}' has total size of {} bytes. " +
+                 "It is recommended to limit the total size to {} bytes, or to use a long-running transaction. ",
+               tx.getTransactionId(), clientId, byteCount, changeSetSizeThreshold);
+    }
+    return set;
+  }
+
   private void addCommittingChangeSet(long writePointer, Set<ChangeId> changes) {
     committingChangeSets.put(writePointer, changes);
   }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae6ce2b5/tephra-core/src/main/java/org/apache/tephra/TransactionSystemClient.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/TransactionSystemClient.java b/tephra-core/src/main/java/org/apache/tephra/TransactionSystemClient.java
index 9702c61..a44f131 100644
--- a/tephra-core/src/main/java/org/apache/tephra/TransactionSystemClient.java
+++ b/tephra-core/src/main/java/org/apache/tephra/TransactionSystemClient.java
@@ -70,10 +70,32 @@ public interface TransactionSystemClient {
    * @param tx transaction to verify
    * @param changeIds ids of changes made by transaction
    * @return true if transaction can be committed otherwise false
+   * @throws TransactionNotInProgressException if the transaction is not in progress; most likely it has timed out.
+   *
+   * @deprecated since 0.13-incubating; use {@link #canCommitOrThrow(Transaction, Collection)} instead
    */
+  @Deprecated
   boolean canCommit(Transaction tx, Collection<byte[]> changeIds) throws TransactionNotInProgressException;
 
   /**
+   * Checks if transaction with the set of changes can be committed. E.g. it can check conflicts with other changes and
+   * refuse commit if there are conflicts. It is assumed that no other changes will be done in between this method call
+   * and {@link #commit(Transaction)} which may check conflicts again to avoid races.
+   * <p/>
+   * Since we do conflict detection at commit time as well, this may seem redundant. The idea is to check for conflicts
+   * before we persist changes to avoid rollback in case of conflicts as much as possible.
+   * NOTE: in some situations we may want to skip this step to save on RPC with a risk of many rollback ops. So by
+   *       default we take safe path.
+   *
+   * @param tx transaction to verify
+   * @param changeIds ids of changes made by transaction
+   * @return true if transaction can be committed otherwise false
+   * @throws TransactionSizeException if the size of the chgange set exceeds the allowed limit
+   * @throws TransactionNotInProgressException if the transaction is not in progress; most likely it has timed out.
+   */
+  boolean canCommitOrThrow(Transaction tx, Collection<byte[]> changeIds) throws TransactionFailureException;
+
+  /**
    * Makes transaction visible. It will again check conflicts of changes submitted previously with
    * {@link #canCommit(Transaction, java.util.Collection)}
    * @param tx transaction to make visible.

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae6ce2b5/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/TxConstants.java b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
index 1dbd3cb..5c78aa4 100644
--- a/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
+++ b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
@@ -179,6 +179,23 @@ public class TxConstants {
     public static final String CFG_TX_SNAPSHOT_RETAIN = "data.tx.snapshot.retain";
     /** Default value for number of most recent snapshots to retain. */
     public static final int DEFAULT_TX_SNAPSHOT_RETAIN = 10;
+
+    /** The limit for the number of entries in a change set. If exceeded, the transaction fails. */
+    public static final String CFG_TX_CHANGESET_COUNT_LIMIT = "data.tx.changeset.count.limit";
+    /** The warning threshold for the number of entries in a change set. If exceeded, a warning is logged. */
+    public static final String CFG_TX_CHANGESET_COUNT_WARN_THRESHOLD = "data.tx.changeset.count.warn.threshold";
+    /** The limit for the total size in bytes of a change set. If exceeded, the transaction fails. */
+    public static final String CFG_TX_CHANGESET_SIZE_LIMIT = "data.tx.changeset.size.limit";
+    /** The warning threshold for the total size in bytes of a change set. If exceeded, a warning is logged. */
+    public static final String CFG_TX_CHANGESET_SIZE_WARN_THRESHOLD = "data.tx.changeset.size.warn.threshold";
+    /** The default limit for the number of entries in a change set is unlimited. */
+    public static final int DEFAULT_TX_CHANGESET_COUNT_LIMIT = Integer.MAX_VALUE;
+    /** The default warning threshold for the number of entries in a change set is unlimited. */
+    public static final int DEFAULT_TX_CHANGESET_COUNT_WARN_THRESHOLD = Integer.MAX_VALUE;
+    /** The default limit for the total size in bytes of a change set is unlimited. */
+    public static final long DEFAULT_TX_CHANGESET_SIZE_LIMIT = Long.MAX_VALUE;
+    /** The default warning threshold for the total size in bytes of a change set is unlimited. */
+    public static final long DEFAULT_TX_CHANGESET_SIZE_WARN_THRESHOLD = Long.MAX_VALUE;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae6ce2b5/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceClient.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceClient.java b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceClient.java
index cdcca7f..f1743de 100644
--- a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceClient.java
+++ b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceClient.java
@@ -28,7 +28,9 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.tephra.InvalidTruncateTimeException;
 import org.apache.tephra.Transaction;
 import org.apache.tephra.TransactionCouldNotTakeSnapshotException;
+import org.apache.tephra.TransactionFailureException;
 import org.apache.tephra.TransactionNotInProgressException;
+import org.apache.tephra.TransactionSizeException;
 import org.apache.tephra.TransactionSystemClient;
 import org.apache.tephra.TxConstants;
 import org.apache.tephra.runtime.ConfigModule;
@@ -65,6 +67,11 @@ public class TransactionServiceClient implements TransactionSystemClient {
   // client id that is used to identify the transactions
   private final String clientId;
 
+  private final int changeSetCountLimit;
+  private final int changeSetCountThreshold;
+  private final long changeSetSizeLimit;
+  private final long changeSetSizeThreshold;
+
   /**
    * Utility to be used for basic verification of transaction system availability and functioning
    * @param args arguments list, accepts single option "-v" that makes it to print out more details about started tx
@@ -109,7 +116,7 @@ public class TransactionServiceClient implements TransactionSystemClient {
                    ", inProgress: " + tx.getInProgress().length);
       }
       LOG.info("Checking if canCommit tx...");
-      boolean canCommit = client.canCommit(tx, Collections.<byte[]>emptyList());
+      boolean canCommit = client.canCommitOrThrow(tx, Collections.<byte[]>emptyList());
       LOG.info("canCommit: " + canCommit);
       if (canCommit) {
         LOG.info("Committing tx...");
@@ -171,6 +178,15 @@ public class TransactionServiceClient implements TransactionSystemClient {
 
     this.clientProvider = clientProvider;
     this.clientId = clientId;
+
+    changeSetCountLimit = config.getInt(TxConstants.Manager.CFG_TX_CHANGESET_COUNT_LIMIT,
+                                        TxConstants.Manager.DEFAULT_TX_CHANGESET_COUNT_LIMIT);
+    changeSetCountThreshold = config.getInt(TxConstants.Manager.CFG_TX_CHANGESET_COUNT_WARN_THRESHOLD,
+                                            TxConstants.Manager.DEFAULT_TX_CHANGESET_COUNT_WARN_THRESHOLD);
+    changeSetSizeLimit = config.getLong(TxConstants.Manager.CFG_TX_CHANGESET_SIZE_LIMIT,
+                                        TxConstants.Manager.DEFAULT_TX_CHANGESET_SIZE_LIMIT);
+    changeSetSizeThreshold = config.getLong(TxConstants.Manager.CFG_TX_CHANGESET_SIZE_WARN_THRESHOLD,
+                                            TxConstants.Manager.DEFAULT_TX_CHANGESET_SIZE_WARN_THRESHOLD);
   }
 
   /**
@@ -324,6 +340,51 @@ public class TransactionServiceClient implements TransactionSystemClient {
   }
 
   @Override
+  public boolean canCommitOrThrow(final Transaction tx, final Collection<byte[]> changeIds)
+    throws TransactionFailureException {
+
+    // we want to validate the size of the change set here before sending it over the wire.
+    // if the change set is large, it can cause memory issues on the server side.
+    if (changeIds.size() > changeSetCountLimit) {
+      throw new TransactionSizeException(String.format(
+        "Change set for transaction %d has %d entries and exceeds the limit of %d",
+        tx.getTransactionId(), changeIds.size(), changeSetCountLimit));
+    } else if (changeIds.size() > changeSetCountThreshold) {
+      LOG.warn("Change set for transaction {} has {} entries. " +
+                 "It is recommended to limit the number of changes to {}, or to use a long-running transaction. ",
+               tx.getTransactionId(), changeIds.size(), changeSetCountThreshold);
+    }
+    long byteCount = 0L;
+    for (byte[] change : changeIds) {
+      byteCount += change.length;
+    }
+    if (byteCount > changeSetSizeLimit) {
+      throw new TransactionSizeException(String.format(
+        "Change set for transaction %d has total size of %d bytes and exceeds the limit of %d bytes",
+        tx.getTransactionId(), byteCount, changeSetSizeLimit));
+    } else if (byteCount > changeSetSizeThreshold) {
+      LOG.warn("Change set for transaction {} has total size of {} bytes. " +
+                 "It is recommended to limit the total size to {} bytes, or to use a long-running transaction. ",
+               tx.getTransactionId(), byteCount, changeSetSizeThreshold);
+    }
+
+    try {
+      return execute(
+        new Operation<Boolean>("canCommit") {
+          @Override
+          public Boolean execute(TransactionServiceThriftClient client)
+            throws Exception {
+            return client.canCommitOrThrow(tx, changeIds);
+          }
+        });
+    } catch (TransactionNotInProgressException | TransactionSizeException e) {
+      throw e;
+    } catch (Exception e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  @Override
   public boolean commit(final Transaction tx) throws TransactionNotInProgressException {
     try {
       return this.execute(

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae6ce2b5/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftClient.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftClient.java b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftClient.java
index ccd266a..ba37243 100644
--- a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftClient.java
+++ b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftClient.java
@@ -25,6 +25,7 @@ import org.apache.tephra.InvalidTruncateTimeException;
 import org.apache.tephra.Transaction;
 import org.apache.tephra.TransactionCouldNotTakeSnapshotException;
 import org.apache.tephra.TransactionNotInProgressException;
+import org.apache.tephra.TransactionSizeException;
 import org.apache.tephra.distributed.thrift.TGenericException;
 import org.apache.tephra.distributed.thrift.TInvalidTruncateTimeException;
 import org.apache.tephra.distributed.thrift.TTransactionCouldNotTakeSnapshotException;
@@ -196,7 +197,26 @@ public class TransactionServiceThriftClient {
     }
   }
 
-
+  public boolean canCommitOrThrow(Transaction tx, Collection<byte[]> changeIds)
+    throws TException, TransactionNotInProgressException, TransactionSizeException {
+    try {
+      return client.canCommitTx(TransactionConverterUtils.wrap(tx),
+                                ImmutableSet.copyOf(Iterables.transform(changeIds, BYTES_WRAPPER))).isValue();
+    } catch (TTransactionNotInProgressException e) {
+      throw new TransactionNotInProgressException(e.getMessage());
+    } catch (TGenericException e) {
+      // currently, we only expect TransactionSizeException here
+      if (!TransactionSizeException.class.getName().equals(e.getOriginalExceptionClass())) {
+        LOG.trace("Expecting only {} as the original exception class but found {}",
+                  TransactionSizeException.class.getName(), e.getOriginalExceptionClass());
+        throw e;
+      }
+      throw new TransactionSizeException(e.getMessage());
+    } catch (TException e) {
+      isValid.set(false);
+      throw e;
+    }
+  }
 
   public boolean commit(Transaction tx) throws TException, TransactionNotInProgressException {
     try {

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae6ce2b5/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftHandler.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftHandler.java b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftHandler.java
index 174b463..0c9105b 100644
--- a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftHandler.java
+++ b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftHandler.java
@@ -22,6 +22,7 @@ import com.google.common.collect.Sets;
 import org.apache.tephra.InvalidTruncateTimeException;
 import org.apache.tephra.TransactionManager;
 import org.apache.tephra.TransactionNotInProgressException;
+import org.apache.tephra.TransactionSizeException;
 import org.apache.tephra.TxConstants;
 import org.apache.tephra.distributed.thrift.TBoolean;
 import org.apache.tephra.distributed.thrift.TGenericException;
@@ -129,6 +130,26 @@ public class TransactionServiceThriftHandler implements TTransactionServer.Iface
       return new TBoolean(txManager.canCommit(TransactionConverterUtils.unwrap(tx), changeIds));
     } catch (TransactionNotInProgressException e) {
       throw new TTransactionNotInProgressException(e.getMessage());
+    } catch (TransactionSizeException e) {
+      return new TBoolean(false); // can't throw exception -> just indicate that it failed
+    }
+  }
+
+  @Override
+  public TBoolean canCommitOrThrow(TTransaction tx, Set<ByteBuffer> changes) throws TException {
+
+    Set<byte[]> changeIds = Sets.newHashSet();
+    for (ByteBuffer bb : changes) {
+      byte[] changeId = new byte[bb.remaining()];
+      bb.get(changeId);
+      changeIds.add(changeId);
+    }
+    try {
+      return new TBoolean(txManager.canCommit(TransactionConverterUtils.unwrap(tx), changeIds));
+    } catch (TransactionNotInProgressException e) {
+      throw new TTransactionNotInProgressException(e.getMessage());
+    } catch (TransactionSizeException e) {
+      throw new TGenericException(e.getMessage(), TransactionSizeException.class.getName());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae6ce2b5/tephra-core/src/main/java/org/apache/tephra/distributed/thrift/TTransactionServer.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/thrift/TTransactionServer.java b/tephra-core/src/main/java/org/apache/tephra/distributed/thrift/TTransactionServer.java
index 6c99bb4..6c07ccb 100644
--- a/tephra-core/src/main/java/org/apache/tephra/distributed/thrift/TTransactionServer.java
+++ b/tephra-core/src/main/java/org/apache/tephra/distributed/thrift/TTransactionServer.java
@@ -68,6 +68,8 @@ public class TTransactionServer {
 
     public TBoolean canCommitTx(TTransaction tx, Set<ByteBuffer> changes) throws TTransactionNotInProgressException, org.apache.thrift.TException;
 
+    public TBoolean canCommitOrThrow(TTransaction tx, Set<ByteBuffer> changes) throws TTransactionNotInProgressException, TGenericException, org.apache.thrift.TException;
+
     public TBoolean commitTx(TTransaction tx) throws TTransactionNotInProgressException, org.apache.thrift.TException;
 
     public void abortTx(TTransaction tx) throws org.apache.thrift.TException;
@@ -110,6 +112,8 @@ public class TTransactionServer {
 
     public void canCommitTx(TTransaction tx, Set<ByteBuffer> changes, org.apache.thrift.async.AsyncMethodCallback<AsyncClient.canCommitTx_call> resultHandler) throws org.apache.thrift.TException;
 
+    public void canCommitOrThrow(TTransaction tx, Set<ByteBuffer> changes, org.apache.thrift.async.AsyncMethodCallback<AsyncClient.canCommitOrThrow_call> resultHandler) throws org.apache.thrift.TException;
+
     public void commitTx(TTransaction tx, org.apache.thrift.async.AsyncMethodCallback<AsyncClient.commitTx_call> resultHandler) throws org.apache.thrift.TException;
 
     public void abortTx(TTransaction tx, org.apache.thrift.async.AsyncMethodCallback<AsyncClient.abortTx_call> resultHandler) throws org.apache.thrift.TException;
@@ -353,6 +357,36 @@ public class TTransactionServer {
       throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "canCommitTx failed: unknown result");
     }
 
+    public TBoolean canCommitOrThrow(TTransaction tx, Set<ByteBuffer> changes) throws TTransactionNotInProgressException, TGenericException, org.apache.thrift.TException
+    {
+      send_canCommitOrThrow(tx, changes);
+      return recv_canCommitOrThrow();
+    }
+
+    public void send_canCommitOrThrow(TTransaction tx, Set<ByteBuffer> changes) throws org.apache.thrift.TException
+    {
+      canCommitOrThrow_args args = new canCommitOrThrow_args();
+      args.setTx(tx);
+      args.setChanges(changes);
+      sendBase("canCommitOrThrow", args);
+    }
+
+    public TBoolean recv_canCommitOrThrow() throws TTransactionNotInProgressException, TGenericException, org.apache.thrift.TException
+    {
+      canCommitOrThrow_result result = new canCommitOrThrow_result();
+      receiveBase(result, "canCommitOrThrow");
+      if (result.isSetSuccess()) {
+        return result.success;
+      }
+      if (result.e != null) {
+        throw result.e;
+      }
+      if (result.g != null) {
+        throw result.g;
+      }
+      throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "canCommitOrThrow failed: unknown result");
+    }
+
     public TBoolean commitTx(TTransaction tx) throws TTransactionNotInProgressException, org.apache.thrift.TException
     {
       send_commitTx(tx);
@@ -878,6 +912,41 @@ public class TTransactionServer {
       }
     }
 
+    public void canCommitOrThrow(TTransaction tx, Set<ByteBuffer> changes, org.apache.thrift.async.AsyncMethodCallback<canCommitOrThrow_call> resultHandler) throws org.apache.thrift.TException {
+      checkReady();
+      canCommitOrThrow_call method_call = new canCommitOrThrow_call(tx, changes, resultHandler, this, ___protocolFactory, ___transport);
+      this.___currentMethod = method_call;
+      ___manager.call(method_call);
+    }
+
+    public static class canCommitOrThrow_call extends org.apache.thrift.async.TAsyncMethodCall {
+      private TTransaction tx;
+      private Set<ByteBuffer> changes;
+      public canCommitOrThrow_call(TTransaction tx, Set<ByteBuffer> changes, org.apache.thrift.async.AsyncMethodCallback<canCommitOrThrow_call> resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
+        super(client, protocolFactory, transport, resultHandler, false);
+        this.tx = tx;
+        this.changes = changes;
+      }
+
+      public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException {
+        prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("canCommitOrThrow", org.apache.thrift.protocol.TMessageType.CALL, 0));
+        canCommitOrThrow_args args = new canCommitOrThrow_args();
+        args.setTx(tx);
+        args.setChanges(changes);
+        args.write(prot);
+        prot.writeMessageEnd();
+      }
+
+      public TBoolean getResult() throws TTransactionNotInProgressException, TGenericException, org.apache.thrift.TException {
+        if (getState() != org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) {
+          throw new IllegalStateException("Method call not finished!");
+        }
+        org.apache.thrift.transport.TMemoryInputTransport memoryTransport = new org.apache.thrift.transport.TMemoryInputTransport(getFrameBuffer().array());
+        org.apache.thrift.protocol.TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport);
+        return (new Client(prot)).recv_canCommitOrThrow();
+      }
+    }
+
     public void commitTx(TTransaction tx, org.apache.thrift.async.AsyncMethodCallback<commitTx_call> resultHandler) throws org.apache.thrift.TException {
       checkReady();
       commitTx_call method_call = new commitTx_call(tx, resultHandler, this, ___protocolFactory, ___transport);
@@ -1236,6 +1305,7 @@ public class TTransactionServer {
       processMap.put("startShortWithClientIdAndTimeOut", new startShortWithClientIdAndTimeOut());
       processMap.put("startShortWithTimeout", new startShortWithTimeout());
       processMap.put("canCommitTx", new canCommitTx());
+      processMap.put("canCommitOrThrow", new canCommitOrThrow());
       processMap.put("commitTx", new commitTx());
       processMap.put("abortTx", new abortTx());
       processMap.put("invalidateTx", new invalidateTx());
@@ -1430,6 +1500,32 @@ public class TTransactionServer {
       }
     }
 
+    public static class canCommitOrThrow<I extends Iface> extends org.apache.thrift.ProcessFunction<I, canCommitOrThrow_args> {
+      public canCommitOrThrow() {
+        super("canCommitOrThrow");
+      }
+
+      public canCommitOrThrow_args getEmptyArgsInstance() {
+        return new canCommitOrThrow_args();
+      }
+
+      protected boolean isOneway() {
+        return false;
+      }
+
+      public canCommitOrThrow_result getResult(I iface, canCommitOrThrow_args args) throws org.apache.thrift.TException {
+        canCommitOrThrow_result result = new canCommitOrThrow_result();
+        try {
+          result.success = iface.canCommitOrThrow(args.tx, args.changes);
+        } catch (TTransactionNotInProgressException e) {
+          result.e = e;
+        } catch (TGenericException g) {
+          result.g = g;
+        }
+        return result;
+      }
+    }
+
     public static class commitTx<I extends Iface> extends org.apache.thrift.ProcessFunction<I, commitTx_args> {
       public commitTx() {
         super("commitTx");
@@ -7921,22 +8017,25 @@ public class TTransactionServer {
 
   }
 
-  public static class commitTx_args implements org.apache.thrift.TBase<commitTx_args, commitTx_args._Fields>, java.io.Serializable, Cloneable   {
-    private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("commitTx_args");
+  public static class canCommitOrThrow_args implements org.apache.thrift.TBase<canCommitOrThrow_args, canCommitOrThrow_args._Fields>, java.io.Serializable, Cloneable   {
+    private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("canCommitOrThrow_args");
 
     private static final org.apache.thrift.protocol.TField TX_FIELD_DESC = new org.apache.thrift.protocol.TField("tx", org.apache.thrift.protocol.TType.STRUCT, (short)1);
+    private static final org.apache.thrift.protocol.TField CHANGES_FIELD_DESC = new org.apache.thrift.protocol.TField("changes", org.apache.thrift.protocol.TType.SET, (short)2);
 
     private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
     static {
-      schemes.put(StandardScheme.class, new commitTx_argsStandardSchemeFactory());
-      schemes.put(TupleScheme.class, new commitTx_argsTupleSchemeFactory());
+      schemes.put(StandardScheme.class, new canCommitOrThrow_argsStandardSchemeFactory());
+      schemes.put(TupleScheme.class, new canCommitOrThrow_argsTupleSchemeFactory());
     }
 
     public TTransaction tx; // required
+    public Set<ByteBuffer> changes; // required
 
     /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
     public enum _Fields implements org.apache.thrift.TFieldIdEnum {
-      TX((short)1, "tx");
+      TX((short)1, "tx"),
+      CHANGES((short)2, "changes");
 
       private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
 
@@ -7953,6 +8052,8 @@ public class TTransactionServer {
         switch(fieldId) {
           case 1: // TX
             return TX;
+          case 2: // CHANGES
+            return CHANGES;
           default:
             return null;
         }
@@ -7998,43 +8099,58 @@ public class TTransactionServer {
       Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
       tmpMap.put(_Fields.TX, new org.apache.thrift.meta_data.FieldMetaData("tx", org.apache.thrift.TFieldRequirementType.DEFAULT, 
           new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, TTransaction.class)));
+      tmpMap.put(_Fields.CHANGES, new org.apache.thrift.meta_data.FieldMetaData("changes", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+          new org.apache.thrift.meta_data.SetMetaData(org.apache.thrift.protocol.TType.SET, 
+              new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING              , true))));
       metaDataMap = Collections.unmodifiableMap(tmpMap);
-      org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(commitTx_args.class, metaDataMap);
+      org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(canCommitOrThrow_args.class, metaDataMap);
     }
 
-    public commitTx_args() {
+    public canCommitOrThrow_args() {
     }
 
-    public commitTx_args(
-      TTransaction tx)
+    public canCommitOrThrow_args(
+      TTransaction tx,
+      Set<ByteBuffer> changes)
     {
       this();
       this.tx = tx;
+      this.changes = changes;
     }
 
     /**
      * Performs a deep copy on <i>other</i>.
      */
-    public commitTx_args(commitTx_args other) {
+    public canCommitOrThrow_args(canCommitOrThrow_args other) {
       if (other.isSetTx()) {
         this.tx = new TTransaction(other.tx);
       }
+      if (other.isSetChanges()) {
+        Set<ByteBuffer> __this__changes = new HashSet<ByteBuffer>();
+        for (ByteBuffer other_element : other.changes) {
+          ByteBuffer temp_binary_element = org.apache.thrift.TBaseHelper.copyBinary(other_element);
+;
+          __this__changes.add(temp_binary_element);
+        }
+        this.changes = __this__changes;
+      }
     }
 
-    public commitTx_args deepCopy() {
-      return new commitTx_args(this);
+    public canCommitOrThrow_args deepCopy() {
+      return new canCommitOrThrow_args(this);
     }
 
     @Override
     public void clear() {
       this.tx = null;
+      this.changes = null;
     }
 
     public TTransaction getTx() {
       return this.tx;
     }
 
-    public commitTx_args setTx(TTransaction tx) {
+    public canCommitOrThrow_args setTx(TTransaction tx) {
       this.tx = tx;
       return this;
     }
@@ -8054,6 +8170,45 @@ public class TTransactionServer {
       }
     }
 
+    public int getChangesSize() {
+      return (this.changes == null) ? 0 : this.changes.size();
+    }
+
+    public java.util.Iterator<ByteBuffer> getChangesIterator() {
+      return (this.changes == null) ? null : this.changes.iterator();
+    }
+
+    public void addToChanges(ByteBuffer elem) {
+      if (this.changes == null) {
+        this.changes = new HashSet<ByteBuffer>();
+      }
+      this.changes.add(elem);
+    }
+
+    public Set<ByteBuffer> getChanges() {
+      return this.changes;
+    }
+
+    public canCommitOrThrow_args setChanges(Set<ByteBuffer> changes) {
+      this.changes = changes;
+      return this;
+    }
+
+    public void unsetChanges() {
+      this.changes = null;
+    }
+
+    /** Returns true if field changes is set (has been assigned a value) and false otherwise */
+    public boolean isSetChanges() {
+      return this.changes != null;
+    }
+
+    public void setChangesIsSet(boolean value) {
+      if (!value) {
+        this.changes = null;
+      }
+    }
+
     public void setFieldValue(_Fields field, Object value) {
       switch (field) {
       case TX:
@@ -8064,6 +8219,14 @@ public class TTransactionServer {
         }
         break;
 
+      case CHANGES:
+        if (value == null) {
+          unsetChanges();
+        } else {
+          setChanges((Set<ByteBuffer>)value);
+        }
+        break;
+
       }
     }
 
@@ -8072,6 +8235,9 @@ public class TTransactionServer {
       case TX:
         return getTx();
 
+      case CHANGES:
+        return getChanges();
+
       }
       throw new IllegalStateException();
     }
@@ -8085,6 +8251,8 @@ public class TTransactionServer {
       switch (field) {
       case TX:
         return isSetTx();
+      case CHANGES:
+        return isSetChanges();
       }
       throw new IllegalStateException();
     }
@@ -8093,12 +8261,12 @@ public class TTransactionServer {
     public boolean equals(Object that) {
       if (that == null)
         return false;
-      if (that instanceof commitTx_args)
-        return this.equals((commitTx_args)that);
+      if (that instanceof canCommitOrThrow_args)
+        return this.equals((canCommitOrThrow_args)that);
       return false;
     }
 
-    public boolean equals(commitTx_args that) {
+    public boolean equals(canCommitOrThrow_args that) {
       if (that == null)
         return false;
 
@@ -8111,6 +8279,15 @@ public class TTransactionServer {
           return false;
       }
 
+      boolean this_present_changes = true && this.isSetChanges();
+      boolean that_present_changes = true && that.isSetChanges();
+      if (this_present_changes || that_present_changes) {
+        if (!(this_present_changes && that_present_changes))
+          return false;
+        if (!this.changes.equals(that.changes))
+          return false;
+      }
+
       return true;
     }
 
@@ -8119,13 +8296,13 @@ public class TTransactionServer {
       return 0;
     }
 
-    public int compareTo(commitTx_args other) {
+    public int compareTo(canCommitOrThrow_args other) {
       if (!getClass().equals(other.getClass())) {
         return getClass().getName().compareTo(other.getClass().getName());
       }
 
       int lastComparison = 0;
-      commitTx_args typedOther = (commitTx_args)other;
+      canCommitOrThrow_args typedOther = (canCommitOrThrow_args)other;
 
       lastComparison = Boolean.valueOf(isSetTx()).compareTo(typedOther.isSetTx());
       if (lastComparison != 0) {
@@ -8137,6 +8314,16 @@ public class TTransactionServer {
           return lastComparison;
         }
       }
+      lastComparison = Boolean.valueOf(isSetChanges()).compareTo(typedOther.isSetChanges());
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+      if (isSetChanges()) {
+        lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.changes, typedOther.changes);
+        if (lastComparison != 0) {
+          return lastComparison;
+        }
+      }
       return 0;
     }
 
@@ -8154,7 +8341,7 @@ public class TTransactionServer {
 
     @Override
     public String toString() {
-      StringBuilder sb = new StringBuilder("commitTx_args(");
+      StringBuilder sb = new StringBuilder("canCommitOrThrow_args(");
       boolean first = true;
 
       sb.append("tx:");
@@ -8164,6 +8351,14 @@ public class TTransactionServer {
         sb.append(this.tx);
       }
       first = false;
+      if (!first) sb.append(", ");
+      sb.append("changes:");
+      if (this.changes == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.changes);
+      }
+      first = false;
       sb.append(")");
       return sb.toString();
     }
@@ -8192,15 +8387,15 @@ public class TTransactionServer {
       }
     }
 
-    private static class commitTx_argsStandardSchemeFactory implements SchemeFactory {
-      public commitTx_argsStandardScheme getScheme() {
-        return new commitTx_argsStandardScheme();
+    private static class canCommitOrThrow_argsStandardSchemeFactory implements SchemeFactory {
+      public canCommitOrThrow_argsStandardScheme getScheme() {
+        return new canCommitOrThrow_argsStandardScheme();
       }
     }
 
-    private static class commitTx_argsStandardScheme extends StandardScheme<commitTx_args> {
+    private static class canCommitOrThrow_argsStandardScheme extends StandardScheme<canCommitOrThrow_args> {
 
-      public void read(org.apache.thrift.protocol.TProtocol iprot, commitTx_args struct) throws org.apache.thrift.TException {
+      public void read(org.apache.thrift.protocol.TProtocol iprot, canCommitOrThrow_args struct) throws org.apache.thrift.TException {
         org.apache.thrift.protocol.TField schemeField;
         iprot.readStructBegin();
         while (true)
@@ -8219,6 +8414,24 @@ public class TTransactionServer {
                 org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
               }
               break;
+            case 2: // CHANGES
+              if (schemeField.type == org.apache.thrift.protocol.TType.SET) {
+                {
+                  org.apache.thrift.protocol.TSet _set32 = iprot.readSetBegin();
+                  struct.changes = new HashSet<ByteBuffer>(2*_set32.size);
+                  for (int _i33 = 0; _i33 < _set32.size; ++_i33)
+                  {
+                    ByteBuffer _elem34; // required
+                    _elem34 = iprot.readBinary();
+                    struct.changes.add(_elem34);
+                  }
+                  iprot.readSetEnd();
+                }
+                struct.setChangesIsSet(true);
+              } else { 
+                org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+              }
+              break;
             default:
               org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
           }
@@ -8230,7 +8443,7 @@ public class TTransactionServer {
         struct.validate();
       }
 
-      public void write(org.apache.thrift.protocol.TProtocol oprot, commitTx_args struct) throws org.apache.thrift.TException {
+      public void write(org.apache.thrift.protocol.TProtocol oprot, canCommitOrThrow_args struct) throws org.apache.thrift.TException {
         struct.validate();
 
         oprot.writeStructBegin(STRUCT_DESC);
@@ -8239,66 +8452,106 @@ public class TTransactionServer {
           struct.tx.write(oprot);
           oprot.writeFieldEnd();
         }
+        if (struct.changes != null) {
+          oprot.writeFieldBegin(CHANGES_FIELD_DESC);
+          {
+            oprot.writeSetBegin(new org.apache.thrift.protocol.TSet(org.apache.thrift.protocol.TType.STRING, struct.changes.size()));
+            for (ByteBuffer _iter35 : struct.changes)
+            {
+              oprot.writeBinary(_iter35);
+            }
+            oprot.writeSetEnd();
+          }
+          oprot.writeFieldEnd();
+        }
         oprot.writeFieldStop();
         oprot.writeStructEnd();
       }
 
     }
 
-    private static class commitTx_argsTupleSchemeFactory implements SchemeFactory {
-      public commitTx_argsTupleScheme getScheme() {
-        return new commitTx_argsTupleScheme();
+    private static class canCommitOrThrow_argsTupleSchemeFactory implements SchemeFactory {
+      public canCommitOrThrow_argsTupleScheme getScheme() {
+        return new canCommitOrThrow_argsTupleScheme();
       }
     }
 
-    private static class commitTx_argsTupleScheme extends TupleScheme<commitTx_args> {
+    private static class canCommitOrThrow_argsTupleScheme extends TupleScheme<canCommitOrThrow_args> {
 
       @Override
-      public void write(org.apache.thrift.protocol.TProtocol prot, commitTx_args struct) throws org.apache.thrift.TException {
+      public void write(org.apache.thrift.protocol.TProtocol prot, canCommitOrThrow_args struct) throws org.apache.thrift.TException {
         TTupleProtocol oprot = (TTupleProtocol) prot;
         BitSet optionals = new BitSet();
         if (struct.isSetTx()) {
           optionals.set(0);
         }
-        oprot.writeBitSet(optionals, 1);
+        if (struct.isSetChanges()) {
+          optionals.set(1);
+        }
+        oprot.writeBitSet(optionals, 2);
         if (struct.isSetTx()) {
           struct.tx.write(oprot);
         }
+        if (struct.isSetChanges()) {
+          {
+            oprot.writeI32(struct.changes.size());
+            for (ByteBuffer _iter36 : struct.changes)
+            {
+              oprot.writeBinary(_iter36);
+            }
+          }
+        }
       }
 
       @Override
-      public void read(org.apache.thrift.protocol.TProtocol prot, commitTx_args struct) throws org.apache.thrift.TException {
+      public void read(org.apache.thrift.protocol.TProtocol prot, canCommitOrThrow_args struct) throws org.apache.thrift.TException {
         TTupleProtocol iprot = (TTupleProtocol) prot;
-        BitSet incoming = iprot.readBitSet(1);
+        BitSet incoming = iprot.readBitSet(2);
         if (incoming.get(0)) {
           struct.tx = new TTransaction();
           struct.tx.read(iprot);
           struct.setTxIsSet(true);
         }
+        if (incoming.get(1)) {
+          {
+            org.apache.thrift.protocol.TSet _set37 = new org.apache.thrift.protocol.TSet(org.apache.thrift.protocol.TType.STRING, iprot.readI32());
+            struct.changes = new HashSet<ByteBuffer>(2*_set37.size);
+            for (int _i38 = 0; _i38 < _set37.size; ++_i38)
+            {
+              ByteBuffer _elem39; // required
+              _elem39 = iprot.readBinary();
+              struct.changes.add(_elem39);
+            }
+          }
+          struct.setChangesIsSet(true);
+        }
       }
     }
 
   }
 
-  public static class commitTx_result implements org.apache.thrift.TBase<commitTx_result, commitTx_result._Fields>, java.io.Serializable, Cloneable   {
-    private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("commitTx_result");
+  public static class canCommitOrThrow_result implements org.apache.thrift.TBase<canCommitOrThrow_result, canCommitOrThrow_result._Fields>, java.io.Serializable, Cloneable   {
+    private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("canCommitOrThrow_result");
 
     private static final org.apache.thrift.protocol.TField SUCCESS_FIELD_DESC = new org.apache.thrift.protocol.TField("success", org.apache.thrift.protocol.TType.STRUCT, (short)0);
     private static final org.apache.thrift.protocol.TField E_FIELD_DESC = new org.apache.thrift.protocol.TField("e", org.apache.thrift.protocol.TType.STRUCT, (short)1);
+    private static final org.apache.thrift.protocol.TField G_FIELD_DESC = new org.apache.thrift.protocol.TField("g", org.apache.thrift.protocol.TType.STRUCT, (short)2);
 
     private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
     static {
-      schemes.put(StandardScheme.class, new commitTx_resultStandardSchemeFactory());
-      schemes.put(TupleScheme.class, new commitTx_resultTupleSchemeFactory());
+      schemes.put(StandardScheme.class, new canCommitOrThrow_resultStandardSchemeFactory());
+      schemes.put(TupleScheme.class, new canCommitOrThrow_resultTupleSchemeFactory());
     }
 
     public TBoolean success; // required
     public TTransactionNotInProgressException e; // required
+    public TGenericException g; // required
 
     /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
     public enum _Fields implements org.apache.thrift.TFieldIdEnum {
       SUCCESS((short)0, "success"),
-      E((short)1, "e");
+      E((short)1, "e"),
+      G((short)2, "g");
 
       private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
 
@@ -8317,6 +8570,8 @@ public class TTransactionServer {
             return SUCCESS;
           case 1: // E
             return E;
+          case 2: // G
+            return G;
           default:
             return null;
         }
@@ -8364,49 +8619,57 @@ public class TTransactionServer {
           new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, TBoolean.class)));
       tmpMap.put(_Fields.E, new org.apache.thrift.meta_data.FieldMetaData("e", org.apache.thrift.TFieldRequirementType.DEFAULT, 
           new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRUCT)));
+      tmpMap.put(_Fields.G, new org.apache.thrift.meta_data.FieldMetaData("g", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+          new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRUCT)));
       metaDataMap = Collections.unmodifiableMap(tmpMap);
-      org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(commitTx_result.class, metaDataMap);
+      org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(canCommitOrThrow_result.class, metaDataMap);
     }
 
-    public commitTx_result() {
+    public canCommitOrThrow_result() {
     }
 
-    public commitTx_result(
+    public canCommitOrThrow_result(
       TBoolean success,
-      TTransactionNotInProgressException e)
+      TTransactionNotInProgressException e,
+      TGenericException g)
     {
       this();
       this.success = success;
       this.e = e;
+      this.g = g;
     }
 
     /**
      * Performs a deep copy on <i>other</i>.
      */
-    public commitTx_result(commitTx_result other) {
+    public canCommitOrThrow_result(canCommitOrThrow_result other) {
       if (other.isSetSuccess()) {
         this.success = new TBoolean(other.success);
       }
       if (other.isSetE()) {
         this.e = new TTransactionNotInProgressException(other.e);
       }
+      if (other.isSetG()) {
+        this.g = new TGenericException(other.g);
+      }
     }
 
-    public commitTx_result deepCopy() {
-      return new commitTx_result(this);
+    public canCommitOrThrow_result deepCopy() {
+      return new canCommitOrThrow_result(this);
     }
 
     @Override
     public void clear() {
       this.success = null;
       this.e = null;
+      this.g = null;
     }
 
     public TBoolean getSuccess() {
       return this.success;
     }
 
-    public commitTx_result setSuccess(TBoolean success) {
+    public canCommitOrThrow_result setSuccess(TBoolean success) {
       this.success = success;
       return this;
     }
@@ -8430,7 +8693,916 @@ public class TTransactionServer {
       return this.e;
     }
 
-    public commitTx_result setE(TTransactionNotInProgressException e) {
+    public canCommitOrThrow_result setE(TTransactionNotInProgressException e) {
+      this.e = e;
+      return this;
+    }
+
+    public void unsetE() {
+      this.e = null;
+    }
+
+    /** Returns true if field e is set (has been assigned a value) and false otherwise */
+    public boolean isSetE() {
+      return this.e != null;
+    }
+
+    public void setEIsSet(boolean value) {
+      if (!value) {
+        this.e = null;
+      }
+    }
+
+    public TGenericException getG() {
+      return this.g;
+    }
+
+    public canCommitOrThrow_result setG(TGenericException g) {
+      this.g = g;
+      return this;
+    }
+
+    public void unsetG() {
+      this.g = null;
+    }
+
+    /** Returns true if field g is set (has been assigned a value) and false otherwise */
+    public boolean isSetG() {
+      return this.g != null;
+    }
+
+    public void setGIsSet(boolean value) {
+      if (!value) {
+        this.g = null;
+      }
+    }
+
+    public void setFieldValue(_Fields field, Object value) {
+      switch (field) {
+      case SUCCESS:
+        if (value == null) {
+          unsetSuccess();
+        } else {
+          setSuccess((TBoolean)value);
+        }
+        break;
+
+      case E:
+        if (value == null) {
+          unsetE();
+        } else {
+          setE((TTransactionNotInProgressException)value);
+        }
+        break;
+
+      case G:
+        if (value == null) {
+          unsetG();
+        } else {
+          setG((TGenericException)value);
+        }
+        break;
+
+      }
+    }
+
+    public Object getFieldValue(_Fields field) {
+      switch (field) {
+      case SUCCESS:
+        return getSuccess();
+
+      case E:
+        return getE();
+
+      case G:
+        return getG();
+
+      }
+      throw new IllegalStateException();
+    }
+
+    /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+    public boolean isSet(_Fields field) {
+      if (field == null) {
+        throw new IllegalArgumentException();
+      }
+
+      switch (field) {
+      case SUCCESS:
+        return isSetSuccess();
+      case E:
+        return isSetE();
+      case G:
+        return isSetG();
+      }
+      throw new IllegalStateException();
+    }
+
+    @Override
+    public boolean equals(Object that) {
+      if (that == null)
+        return false;
+      if (that instanceof canCommitOrThrow_result)
+        return this.equals((canCommitOrThrow_result)that);
+      return false;
+    }
+
+    public boolean equals(canCommitOrThrow_result that) {
+      if (that == null)
+        return false;
+
+      boolean this_present_success = true && this.isSetSuccess();
+      boolean that_present_success = true && that.isSetSuccess();
+      if (this_present_success || that_present_success) {
+        if (!(this_present_success && that_present_success))
+          return false;
+        if (!this.success.equals(that.success))
+          return false;
+      }
+
+      boolean this_present_e = true && this.isSetE();
+      boolean that_present_e = true && that.isSetE();
+      if (this_present_e || that_present_e) {
+        if (!(this_present_e && that_present_e))
+          return false;
+        if (!this.e.equals(that.e))
+          return false;
+      }
+
+      boolean this_present_g = true && this.isSetG();
+      boolean that_present_g = true && that.isSetG();
+      if (this_present_g || that_present_g) {
+        if (!(this_present_g && that_present_g))
+          return false;
+        if (!this.g.equals(that.g))
+          return false;
+      }
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      return 0;
+    }
+
+    public int compareTo(canCommitOrThrow_result other) {
+      if (!getClass().equals(other.getClass())) {
+        return getClass().getName().compareTo(other.getClass().getName());
+      }
+
+      int lastComparison = 0;
+      canCommitOrThrow_result typedOther = (canCommitOrThrow_result)other;
+
+      lastComparison = Boolean.valueOf(isSetSuccess()).compareTo(typedOther.isSetSuccess());
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+      if (isSetSuccess()) {
+        lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.success, typedOther.success);
+        if (lastComparison != 0) {
+          return lastComparison;
+        }
+      }
+      lastComparison = Boolean.valueOf(isSetE()).compareTo(typedOther.isSetE());
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+      if (isSetE()) {
+        lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.e, typedOther.e);
+        if (lastComparison != 0) {
+          return lastComparison;
+        }
+      }
+      lastComparison = Boolean.valueOf(isSetG()).compareTo(typedOther.isSetG());
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+      if (isSetG()) {
+        lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.g, typedOther.g);
+        if (lastComparison != 0) {
+          return lastComparison;
+        }
+      }
+      return 0;
+    }
+
+    public _Fields fieldForId(int fieldId) {
+      return _Fields.findByThriftId(fieldId);
+    }
+
+    public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+      schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+    }
+
+    public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+      schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+      }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder("canCommitOrThrow_result(");
+      boolean first = true;
+
+      sb.append("success:");
+      if (this.success == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.success);
+      }
+      first = false;
+      if (!first) sb.append(", ");
+      sb.append("e:");
+      if (this.e == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.e);
+      }
+      first = false;
+      if (!first) sb.append(", ");
+      sb.append("g:");
+      if (this.g == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.g);
+      }
+      first = false;
+      sb.append(")");
+      return sb.toString();
+    }
+
+    public void validate() throws org.apache.thrift.TException {
+      // check for required fields
+      // check for sub-struct validity
+      if (success != null) {
+        success.validate();
+      }
+    }
+
+    private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+      try {
+        write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+      try {
+        read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private static class canCommitOrThrow_resultStandardSchemeFactory implements SchemeFactory {
+      public canCommitOrThrow_resultStandardScheme getScheme() {
+        return new canCommitOrThrow_resultStandardScheme();
+      }
+    }
+
+    private static class canCommitOrThrow_resultStandardScheme extends StandardScheme<canCommitOrThrow_result> {
+
+      public void read(org.apache.thrift.protocol.TProtocol iprot, canCommitOrThrow_result struct) throws org.apache.thrift.TException {
+        org.apache.thrift.protocol.TField schemeField;
+        iprot.readStructBegin();
+        while (true)
+        {
+          schemeField = iprot.readFieldBegin();
+          if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
+            break;
+          }
+          switch (schemeField.id) {
+            case 0: // SUCCESS
+              if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) {
+                struct.success = new TBoolean();
+                struct.success.read(iprot);
+                struct.setSuccessIsSet(true);
+              } else { 
+                org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+              }
+              break;
+            case 1: // E
+              if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) {
+                struct.e = new TTransactionNotInProgressException();
+                struct.e.read(iprot);
+                struct.setEIsSet(true);
+              } else { 
+                org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+              }
+              break;
+            case 2: // G
+              if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) {
+                struct.g = new TGenericException();
+                struct.g.read(iprot);
+                struct.setGIsSet(true);
+              } else { 
+                org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+              }
+              break;
+            default:
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+          }
+          iprot.readFieldEnd();
+        }
+        iprot.readStructEnd();
+
+        // check for required fields of primitive type, which can't be checked in the validate method
+        struct.validate();
+      }
+
+      public void write(org.apache.thrift.protocol.TProtocol oprot, canCommitOrThrow_result struct) throws org.apache.thrift.TException {
+        struct.validate();
+
+        oprot.writeStructBegin(STRUCT_DESC);
+        if (struct.success != null) {
+          oprot.writeFieldBegin(SUCCESS_FIELD_DESC);
+          struct.success.write(oprot);
+          oprot.writeFieldEnd();
+        }
+        if (struct.e != null) {
+          oprot.writeFieldBegin(E_FIELD_DESC);
+          struct.e.write(oprot);
+          oprot.writeFieldEnd();
+        }
+        if (struct.g != null) {
+          oprot.writeFieldBegin(G_FIELD_DESC);
+          struct.g.write(oprot);
+          oprot.writeFieldEnd();
+        }
+        oprot.writeFieldStop();
+        oprot.writeStructEnd();
+      }
+
+    }
+
+    private static class canCommitOrThrow_resultTupleSchemeFactory implements SchemeFactory {
+      public canCommitOrThrow_resultTupleScheme getScheme() {
+        return new canCommitOrThrow_resultTupleScheme();
+      }
+    }
+
+    private static class canCommitOrThrow_resultTupleScheme extends TupleScheme<canCommitOrThrow_result> {
+
+      @Override
+      public void write(org.apache.thrift.protocol.TProtocol prot, canCommitOrThrow_result struct) throws org.apache.thrift.TException {
+        TTupleProtocol oprot = (TTupleProtocol) prot;
+        BitSet optionals = new BitSet();
+        if (struct.isSetSuccess()) {
+          optionals.set(0);
+        }
+        if (struct.isSetE()) {
+          optionals.set(1);
+        }
+        if (struct.isSetG()) {
+          optionals.set(2);
+        }
+        oprot.writeBitSet(optionals, 3);
+        if (struct.isSetSuccess()) {
+          struct.success.write(oprot);
+        }
+        if (struct.isSetE()) {
+          struct.e.write(oprot);
+        }
+        if (struct.isSetG()) {
+          struct.g.write(oprot);
+        }
+      }
+
+      @Override
+      public void read(org.apache.thrift.protocol.TProtocol prot, canCommitOrThrow_result struct) throws org.apache.thrift.TException {
+        TTupleProtocol iprot = (TTupleProtocol) prot;
+        BitSet incoming = iprot.readBitSet(3);
+        if (incoming.get(0)) {
+          struct.success = new TBoolean();
+          struct.success.read(iprot);
+          struct.setSuccessIsSet(true);
+        }
+        if (incoming.get(1)) {
+          struct.e = new TTransactionNotInProgressException();
+          struct.e.read(iprot);
+          struct.setEIsSet(true);
+        }
+        if (incoming.get(2)) {
+          struct.g = new TGenericException();
+          struct.g.read(iprot);
+          struct.setGIsSet(true);
+        }
+      }
+    }
+
+  }
+
+  public static class commitTx_args implements org.apache.thrift.TBase<commitTx_args, commitTx_args._Fields>, java.io.Serializable, Cloneable   {
+    private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("commitTx_args");
+
+    private static final org.apache.thrift.protocol.TField TX_FIELD_DESC = new org.apache.thrift.protocol.TField("tx", org.apache.thrift.protocol.TType.STRUCT, (short)1);
+
+    private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+    static {
+      schemes.put(StandardScheme.class, new commitTx_argsStandardSchemeFactory());
+      schemes.put(TupleScheme.class, new commitTx_argsTupleSchemeFactory());
+    }
+
+    public TTransaction tx; // required
+
+    /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+    public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+      TX((short)1, "tx");
+
+      private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+      static {
+        for (_Fields field : EnumSet.allOf(_Fields.class)) {
+          byName.put(field.getFieldName(), field);
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, or null if its not found.
+       */
+      public static _Fields findByThriftId(int fieldId) {
+        switch(fieldId) {
+          case 1: // TX
+            return TX;
+          default:
+            return null;
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, throwing an exception
+       * if it is not found.
+       */
+      public static _Fields findByThriftIdOrThrow(int fieldId) {
+        _Fields fields = findByThriftId(fieldId);
+        if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+        return fields;
+      }
+
+      /**
+       * Find the _Fields constant that matches name, or null if its not found.
+       */
+      public static _Fields findByName(String name) {
+        return byName.get(name);
+      }
+
+      private final short _thriftId;
+      private final String _fieldName;
+
+      _Fields(short thriftId, String fieldName) {
+        _thriftId = thriftId;
+        _fieldName = fieldName;
+      }
+
+      public short getThriftFieldId() {
+        return _thriftId;
+      }
+
+      public String getFieldName() {
+        return _fieldName;
+      }
+    }
+
+    // isset id assignments
+    public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+    static {
+      Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+      tmpMap.put(_Fields.TX, new org.apache.thrift.meta_data.FieldMetaData("tx", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+          new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, TTransaction.class)));
+      metaDataMap = Collections.unmodifiableMap(tmpMap);
+      org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(commitTx_args.class, metaDataMap);
+    }
+
+    public commitTx_args() {
+    }
+
+    public commitTx_args(
+      TTransaction tx)
+    {
+      this();
+      this.tx = tx;
+    }
+
+    /**
+     * Performs a deep copy on <i>other</i>.
+     */
+    public commitTx_args(commitTx_args other) {
+      if (other.isSetTx()) {
+        this.tx = new TTransaction(other.tx);
+      }
+    }
+
+    public commitTx_args deepCopy() {
+      return new commitTx_args(this);
+    }
+
+    @Override
+    public void clear() {
+      this.tx = null;
+    }
+
+    public TTransaction getTx() {
+      return this.tx;
+    }
+
+    public commitTx_args setTx(TTransaction tx) {
+      this.tx = tx;
+      return this;
+    }
+
+    public void unsetTx() {
+      this.tx = null;
+    }
+
+    /** Returns true if field tx is set (has been assigned a value) and false otherwise */
+    public boolean isSetTx() {
+      return this.tx != null;
+    }
+
+    public void setTxIsSet(boolean value) {
+      if (!value) {
+        this.tx = null;
+      }
+    }
+
+    public void setFieldValue(_Fields field, Object value) {
+      switch (field) {
+      case TX:
+        if (value == null) {
+          unsetTx();
+        } else {
+          setTx((TTransaction)value);
+        }
+        break;
+
+      }
+    }
+
+    public Object getFieldValue(_Fields field) {
+      switch (field) {
+      case TX:
+        return getTx();
+
+      }
+      throw new IllegalStateException();
+    }
+
+    /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+    public boolean isSet(_Fields field) {
+      if (field == null) {
+        throw new IllegalArgumentException();
+      }
+
+      switch (field) {
+      case TX:
+        return isSetTx();
+      }
+      throw new IllegalStateException();
+    }
+
+    @Override
+    public boolean equals(Object that) {
+      if (that == null)
+        return false;
+      if (that instanceof commitTx_args)
+        return this.equals((commitTx_args)that);
+      return false;
+    }
+
+    public boolean equals(commitTx_args that) {
+      if (that == null)
+        return false;
+
+      boolean this_present_tx = true && this.isSetTx();
+      boolean that_present_tx = true && that.isSetTx();
+      if (this_present_tx || that_present_tx) {
+        if (!(this_present_tx && that_present_tx))
+          return false;
+        if (!this.tx.equals(that.tx))
+          return false;
+      }
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      return 0;
+    }
+
+    public int compareTo(commitTx_args other) {
+      if (!getClass().equals(other.getClass())) {
+        return getClass().getName().compareTo(other.getClass().getName());
+      }
+
+      int lastComparison = 0;
+      commitTx_args typedOther = (commitTx_args)other;
+
+      lastComparison = Boolean.valueOf(isSetTx()).compareTo(typedOther.isSetTx());
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+      if (isSetTx()) {
+        lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.tx, typedOther.tx);
+        if (lastComparison != 0) {
+          return lastComparison;
+        }
+      }
+      return 0;
+    }
+
+    public _Fields fieldForId(int fieldId) {
+      return _Fields.findByThriftId(fieldId);
+    }
+
+    public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+      schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+    }
+
+    public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+      schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder("commitTx_args(");
+      boolean first = true;
+
+      sb.append("tx:");
+      if (this.tx == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.tx);
+      }
+      first = false;
+      sb.append(")");
+      return sb.toString();
+    }
+
+    public void validate() throws org.apache.thrift.TException {
+      // check for required fields
+      // check for sub-struct validity
+      if (tx != null) {
+        tx.validate();
+      }
+    }
+
+    private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+      try {
+        write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+      try {
+        read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private static class commitTx_argsStandardSchemeFactory implements SchemeFactory {
+      public commitTx_argsStandardScheme getScheme() {
+        return new commitTx_argsStandardScheme();
+      }
+    }
+
+    private static class commitTx_argsStandardScheme extends StandardScheme<commitTx_args> {
+
+      public void read(org.apache.thrift.protocol.TProtocol iprot, commitTx_args struct) throws org.apache.thrift.TException {
+        org.apache.thrift.protocol.TField schemeField;
+        iprot.readStructBegin();
+        while (true)
+        {
+          schemeField = iprot.readFieldBegin();
+          if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
+            break;
+          }
+          switch (schemeField.id) {
+            case 1: // TX
+              if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) {
+                struct.tx = new TTransaction();
+                struct.tx.read(iprot);
+                struct.setTxIsSet(true);
+              } else { 
+                org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+              }
+              break;
+            default:
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+          }
+          iprot.readFieldEnd();
+        }
+        iprot.readStructEnd();
+
+        // check for required fields of primitive type, which can't be checked in the validate method
+        struct.validate();
+      }
+
+      public void write(org.apache.thrift.protocol.TProtocol oprot, commitTx_args struct) throws org.apache.thrift.TException {
+        struct.validate();
+
+        oprot.writeStructBegin(STRUCT_DESC);
+        if (struct.tx != null) {
+          oprot.writeFieldBegin(TX_FIELD_DESC);
+          struct.tx.write(oprot);
+          oprot.writeFieldEnd();
+        }
+        oprot.writeFieldStop();
+        oprot.writeStructEnd();
+      }
+
+    }
+
+    private static class commitTx_argsTupleSchemeFactory implements SchemeFactory {
+      public commitTx_argsTupleScheme getScheme() {
+        return new commitTx_argsTupleScheme();
+      }
+    }
+
+    private static class commitTx_argsTupleScheme extends TupleScheme<commitTx_args> {
+
+      @Override
+      public void write(org.apache.thrift.protocol.TProtocol prot, commitTx_args struct) throws org.apache.thrift.TException {
+        TTupleProtocol oprot = (TTupleProtocol) prot;
+        BitSet optionals = new BitSet();
+        if (struct.isSetTx()) {
+          optionals.set(0);
+        }
+        oprot.writeBitSet(optionals, 1);
+        if (struct.isSetTx()) {
+          struct.tx.write(oprot);
+        }
+      }
+
+      @Override
+      public void read(org.apache.thrift.protocol.TProtocol prot, commitTx_args struct) throws org.apache.thrift.TException {
+        TTupleProtocol iprot = (TTupleProtocol) prot;
+        BitSet incoming = iprot.readBitSet(1);
+        if (incoming.get(0)) {
+          struct.tx = new TTransaction();
+          struct.tx.read(iprot);
+          struct.setTxIsSet(true);
+        }
+      }
+    }
+
+  }
+
+  public static class commitTx_result implements org.apache.thrift.TBase<commitTx_result, commitTx_result._Fields>, java.io.Serializable, Cloneable   {
+    private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("commitTx_result");
+
+    private static final org.apache.thrift.protocol.TField SUCCESS_FIELD_DESC = new org.apache.thrift.protocol.TField("success", org.apache.thrift.protocol.TType.STRUCT, (short)0);
+    private static final org.apache.thrift.protocol.TField E_FIELD_DESC = new org.apache.thrift.protocol.TField("e", org.apache.thrift.protocol.TType.STRUCT, (short)1);
+
+    private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+    static {
+      schemes.put(StandardScheme.class, new commitTx_resultStandardSchemeFactory());
+      schemes.put(TupleScheme.class, new commitTx_resultTupleSchemeFactory());
+    }
+
+    public TBoolean success; // required
+    public TTransactionNotInProgressException e; // required
+
+    /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+    public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+      SUCCESS((short)0, "success"),
+      E((short)1, "e");
+
+      private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+      static {
+        for (_Fields field : EnumSet.allOf(_Fields.class)) {
+          byName.put(field.getFieldName(), field);
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, or null if its not found.
+       */
+      public static _Fields findByThriftId(int fieldId) {
+        switch(fieldId) {
+          case 0: // SUCCESS
+            return SUCCESS;
+          case 1: // E
+            return E;
+          default:
+            return null;
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, throwing an exception
+       * if it is not found.
+       */
+      public static _Fields findByThriftIdOrThrow(int fieldId) {
+        _Fields fields = findByThriftId(fieldId);
+        if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+        return fields;
+      }
+
+      /**
+       * Find the _Fields constant that matches name, or null if its not found.
+       */
+      public static _Fields findByName(String name) {
+        return byName.get(name);
+      }
+
+      private final short _thriftId;
+      private final String _fieldName;
+
+      _Fields(short thriftId, String fieldName) {
+        _thriftId = thriftId;
+        _fieldName = fieldName;
+      }
+
+      public short getThriftFieldId() {
+        return _thriftId;
+      }
+
+      public String getFieldName() {
+        return _fieldName;
+      }
+    }
+
+    // isset id assignments
+    public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+    static {
+      Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+      tmpMap.put(_Fields.SUCCESS, new org.apache.thrift.meta_data.FieldMetaData("success", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+          new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, TBoolean.class)));
+      tmpMap.put(_Fields.E, new org.apache.thrift.meta_data.FieldMetaData("e", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+          new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRUCT)));
+      metaDataMap = Collections.unmodifiableMap(tmpMap);
+      org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(commitTx_result.class, metaDataMap);
+    }
+
+    public commitTx_result() {
+    }
+
+    public commitTx_result(
+      TBoolean success,
+      TTransactionNotInProgressException e)
+    {
+      this();
+      this.success = success;
+      this.e = e;
+    }
+
+    /**
+     * Performs a deep copy on <i>other</i>.
+     */
+    public commitTx_result(commitTx_result other) {
+      if (other.isSetSuccess()) {
+        this.success = new TBoolean(other.success);
+      }
+      if (other.isSetE()) {
+        this.e = new TTransactionNotInProgressException(other.e);
+      }
+    }
+
+    public commitTx_result deepCopy() {
+      return new commitTx_result(this);
+    }
+
+    @Override
+    public void clear() {
+      this.success = null;
+      this.e = null;
+    }
+
+    public TBoolean getSuccess() {
+      return this.success;
+    }
+
+    public commitTx_result setSuccess(TBoolean success) {
+      this.success = success;
+      return this;
+    }
+
+    public void unsetSuccess() {
+      this.success = null;
+    }
+
+    /** Returns true if field success is set (has been assigned a value) and false otherwise */
+    public boolean isSetSuccess() {
+      return this.success != null;
+    }
+
+    public void setSuccessIsSet(boolean value) {
+      if (!value) {
+        this.success = null;
+      }
+    }
+
+    public TTransactionNotInProgressException getE() {
+      return this.e;
+    }
+
+    public commitTx_result setE(TTransactionNotInProgressException e) {
       this.e = e;
       return this;
     }
@@ -12166,13 +13338,13 @@ public class TTransactionServer {
             case 1: // TXNS
               if (schemeField.type == org.apache.thrift.protocol.TType.SET) {
                 {
-                  org.apache.thrift.protocol.TSet _set32 = iprot.readSetBegin();
-                  struct.txns = new HashSet<Long>(2*_set32.size);
-                  for (int _i33 = 0; _i33 < _set32.size; ++_i33)
+                  org.apache.thrift.protocol.TSet _set40 = iprot.readSetBegin();
+                  struct.txns = new HashSet<Long>(2*_set40.size);
+                  for (int _i41 = 0; _i41 < _set40.size; ++_i41)
                   {
-                    long _elem34; // required
-                    _elem34 = iprot.readI64();
-                    struct.txns.add(_elem34);
+                    long _elem42; // required
+                    _elem42 = iprot.readI64();
+                    struct.txns.add(_elem42);
                   }
                   iprot.readSetEnd();
                 }
@@ -12200,9 +13372,9 @@ public class TTransactionServer {
           oprot.writeFieldBegin(TXNS_FIELD_DESC);
           {
             oprot.writeSetBegin(new org.apache.thrift.protocol.TSet(org.apache.thrift.protocol.TType.I64, struct.txns.size()));
-            for (long _iter35 : struct.txns)
+            for (long _iter43 : struct.txns)
             {
-              oprot.writeI64(_iter35);
+              oprot.writeI64(_iter43);
             }
             oprot.writeSetEnd();
           }
@@ -12233,9 +13405,9 @@ public class TTransactionServer {
         if (struct.isSetTxns()) {
           {
             oprot.writeI32(struct.txns.size());
-            for (long _iter36 : struct.txns)
+            for (long _iter44 : struct.txns)
             {
-              oprot.writeI64(_iter36);
+              oprot.writeI64(_iter44);
             }
           }
         }
@@ -12247,13 +13419,13 @@ public class TTransactionServer {
         BitSet incoming = iprot.readBitSet(1);
         if (incoming.get(0)) {
           {
-            org.apache.thrift.protocol.TSet _set37 = new org.apache.thrift.protocol.TSet(org.apache.thrift.protocol.TType.I64, iprot.readI32());
-            struct.txns = new HashSet<Long>(2*_set37.size);
-            for (int _i38 = 0; _i38 < _set37.size; ++_i38)
+            org.apache.thrift.protocol.TSet _set45 = new org.apache.thrift.protocol.TSet(org.apache.thrift.protocol.TType.I64, iprot.readI32());
+            struct.txns = new HashSet<Long>(2*_set45.size);
+            for (int _i46 = 0; _i46 < _set45.size; ++_i46)
             {
-              long _elem39; // required
-              _elem39 = iprot.readI64();
-              struct.txns.add(_elem39);
+              long _elem47; // required
+              _elem47 = iprot.readI64();
+              struct.txns.add(_elem47);
             }
           }
           struct.setTxnsIsSet(true);

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae6ce2b5/tephra-core/src/main/java/org/apache/tephra/inmemory/DetachedTxSystemClient.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/inmemory/DetachedTxSystemClient.java b/tephra-core/src/main/java/org/apache/tephra/inmemory/DetachedTxSystemClient.java
index 0a8ed96..dd17431 100644
--- a/tephra-core/src/main/java/org/apache/tephra/inmemory/DetachedTxSystemClient.java
+++ b/tephra-core/src/main/java/org/apache/tephra/inmemory/DetachedTxSystemClient.java
@@ -88,6 +88,11 @@ public class DetachedTxSystemClient implements TransactionSystemClient {
   }
 
   @Override
+  public boolean canCommitOrThrow(Transaction tx, Collection<byte[]> changeIds) {
+    return true;
+  }
+
+  @Override
   public boolean commit(Transaction tx) {
     return true;
   }