You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by an...@apache.org on 2017/09/09 07:12:25 UTC

[1/2] incubator-tephra git commit: (TEPHRA-241) Add a way to limit the size of a transaction

Repository: incubator-tephra
Updated Branches:
  refs/heads/master 8532076f8 -> ae6ce2b5e


http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae6ce2b5/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTxSystemClient.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTxSystemClient.java b/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTxSystemClient.java
index 9719bcc..9e57de8 100644
--- a/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTxSystemClient.java
+++ b/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTxSystemClient.java
@@ -22,8 +22,10 @@ import com.google.inject.Inject;
 import org.apache.tephra.InvalidTruncateTimeException;
 import org.apache.tephra.Transaction;
 import org.apache.tephra.TransactionCouldNotTakeSnapshotException;
+import org.apache.tephra.TransactionFailureException;
 import org.apache.tephra.TransactionManager;
 import org.apache.tephra.TransactionNotInProgressException;
+import org.apache.tephra.TransactionSizeException;
 import org.apache.tephra.TransactionSystemClient;
 import org.apache.tephra.TxConstants;
 import org.slf4j.Logger;
@@ -67,6 +69,15 @@ public class InMemoryTxSystemClient implements TransactionSystemClient {
 
   @Override
   public boolean canCommit(Transaction tx, Collection<byte[]> changeIds) throws TransactionNotInProgressException {
+    try {
+      return changeIds.isEmpty() || txManager.canCommit(tx, changeIds);
+    } catch (TransactionSizeException e) {
+      return false;
+    }
+  }
+
+  @Override
+  public boolean canCommitOrThrow(Transaction tx, Collection<byte[]> changeIds) throws TransactionFailureException {
     return changeIds.isEmpty() || txManager.canCommit(tx, changeIds);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae6ce2b5/tephra-core/src/main/java/org/apache/tephra/inmemory/MinimalTxSystemClient.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/inmemory/MinimalTxSystemClient.java b/tephra-core/src/main/java/org/apache/tephra/inmemory/MinimalTxSystemClient.java
index b54e57f..de46f27 100644
--- a/tephra-core/src/main/java/org/apache/tephra/inmemory/MinimalTxSystemClient.java
+++ b/tephra-core/src/main/java/org/apache/tephra/inmemory/MinimalTxSystemClient.java
@@ -61,6 +61,11 @@ public class MinimalTxSystemClient implements TransactionSystemClient {
   }
 
   @Override
+  public boolean canCommitOrThrow(Transaction tx, Collection<byte[]> changeIds) {
+    return true;
+  }
+
+  @Override
   public boolean commit(Transaction tx) {
     return true;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae6ce2b5/tephra-core/src/main/thrift/transaction.thrift
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/thrift/transaction.thrift b/tephra-core/src/main/thrift/transaction.thrift
index 0e05244..729e035 100644
--- a/tephra-core/src/main/thrift/transaction.thrift
+++ b/tephra-core/src/main/thrift/transaction.thrift
@@ -76,6 +76,8 @@ service TTransactionServer {
   TTransaction startShortWithClientIdAndTimeOut(1: string clientId, 2: i32 timeout) throws (1:TGenericException e),
   TTransaction startShortWithTimeout(1: i32 timeout) throws (1:TGenericException e),
   TBoolean canCommitTx(1: TTransaction tx, 2: set<binary> changes) throws (1:TTransactionNotInProgressException e),
+  TBoolean canCommitOrThrow(1: TTransaction tx, 2: set<binary> changes) throws (1:TTransactionNotInProgressException e,
+                                                                                2:TGenericException g,),
   TBoolean commitTx(1: TTransaction tx) throws (1:TTransactionNotInProgressException e),
   void abortTx(1: TTransaction tx),
   bool invalidateTx(1: i64 tx),

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae6ce2b5/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java b/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java
index f4437c2..0d7d9bf 100644
--- a/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java
@@ -52,7 +52,6 @@ import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class ThriftTransactionSystemTest extends TransactionSystemTest {
@@ -74,13 +73,12 @@ public class ThriftTransactionSystemTest extends TransactionSystemTest {
     zkServer = InMemoryZKServer.builder().setDataDir(tmpFolder.newFolder()).build();
     zkServer.startAndWait();
 
-    Configuration conf = new Configuration();
+    Configuration conf = getCommonConfiguration();
     conf.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false);
     conf.set(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM, zkServer.getConnectionStr());
     // we want to use a retry strategy that lets us query the number of times it retried:
     conf.set(TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, CountingRetryStrategyProvider.class.getName());
     conf.setInt(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, 2);
-    conf.setInt(TxConstants.Manager.CFG_TX_MAX_TIMEOUT, (int) TimeUnit.DAYS.toSeconds(5)); // very long limit
 
     Injector injector = Guice.createInjector(
       new ConfigModule(conf),

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae6ce2b5/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java b/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java
index 56a9076..5f4675b 100644
--- a/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java
@@ -634,6 +634,11 @@ public class TransactionContextTest {
     }
 
     @Override
+    public boolean canCommitOrThrow(Transaction tx, Collection<byte[]> changeIds) throws TransactionFailureException {
+      return canCommit(tx, changeIds);
+    }
+
+    @Override
     public boolean commit(Transaction tx) throws TransactionNotInProgressException {
       if (failCommits-- > 0) {
         return false;

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae6ce2b5/tephra-core/src/test/java/org/apache/tephra/TransactionExecutorTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/TransactionExecutorTest.java b/tephra-core/src/test/java/org/apache/tephra/TransactionExecutorTest.java
index b96b779..676774c 100644
--- a/tephra-core/src/test/java/org/apache/tephra/TransactionExecutorTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/TransactionExecutorTest.java
@@ -548,6 +548,11 @@ public class TransactionExecutorTest {
     }
 
     @Override
+    public boolean canCommitOrThrow(Transaction tx, Collection<byte[]> changeIds) throws TransactionFailureException {
+      return canCommit(tx, changeIds);
+    }
+
+    @Override
     public boolean commit(Transaction tx) throws TransactionNotInProgressException {
       if (failCommits-- > 0) {
         return false;

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae6ce2b5/tephra-core/src/test/java/org/apache/tephra/TransactionManagerTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/TransactionManagerTest.java b/tephra-core/src/test/java/org/apache/tephra/TransactionManagerTest.java
index 1fca773..819a981 100644
--- a/tephra-core/src/test/java/org/apache/tephra/TransactionManagerTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/TransactionManagerTest.java
@@ -26,11 +26,11 @@ import org.apache.tephra.metrics.TxMetricsCollector;
 import org.apache.tephra.persist.InMemoryTransactionStateStorage;
 import org.apache.tephra.persist.TransactionStateStorage;
 import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Assert;
-import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.concurrent.TimeUnit;
 
@@ -39,10 +39,10 @@ import java.util.concurrent.TimeUnit;
  */
 public class TransactionManagerTest extends TransactionSystemTest {
 
-  static Configuration conf = new Configuration();
+  private static Configuration conf;
 
-  TransactionManager txManager = null;
-  TransactionStateStorage txStateStorage = null;
+  private static TransactionManager txManager = null;
+  private static TransactionStateStorage txStateStorage = null;
 
   @Override
   protected TransactionSystemClient getClient() {
@@ -54,10 +54,10 @@ public class TransactionManagerTest extends TransactionSystemTest {
     return txStateStorage;
   }
 
-  @Before
-  public void before() {
+  @BeforeClass
+  public static void beforeClass() {
+    conf = getCommonConfiguration();
     conf.setInt(TxConstants.Manager.CFG_TX_CLEANUP_INTERVAL, 0); // no cleanup thread
-    conf.setInt(TxConstants.Manager.CFG_TX_MAX_TIMEOUT, (int) TimeUnit.DAYS.toSeconds(5)); // very long limit
     // todo should create two sets of tests, one with LocalFileTxStateStorage and one with InMemoryTxStateStorage
     txStateStorage = new InMemoryTransactionStateStorage();
     txManager = new TransactionManager
@@ -65,104 +65,24 @@ public class TransactionManagerTest extends TransactionSystemTest {
     txManager.startAndWait();
   }
 
-  @After
-  public void after() {
+  @AfterClass
+  public static void afterClass() {
     txManager.stopAndWait();
   }
 
-  @Test
-  public void testCheckpointing() throws TransactionNotInProgressException {
-    // create a few transactions
-    Transaction tx1 = txManager.startShort();
-    Transaction tx2 = txManager.startShort();
-    Transaction tx3 = txManager.startShort();
-
-    // start and commit a few
-    for (int i = 0; i < 5; i++) {
-      Transaction tx = txManager.startShort();
-      Assert.assertTrue(txManager.canCommit(tx, Collections.singleton(new byte[] { (byte) i })));
-      Assert.assertTrue(txManager.commit(tx));
-    }
-
-    // checkpoint the transactions
-    Transaction tx3c = txManager.checkpoint(tx3);
-    Transaction tx2c = txManager.checkpoint(tx2);
-    Transaction tx1c = txManager.checkpoint(tx1);
-
-    // start and commit a few (this moves the read pointer past all checkpoint write versions)
-    for (int i = 5; i < 10; i++) {
-      Transaction tx = txManager.startShort();
-      Assert.assertTrue(txManager.canCommit(tx, Collections.singleton(new byte[] { (byte) i })));
-      Assert.assertTrue(txManager.commit(tx));
-    }
-
-    // start new tx and validate all write pointers are excluded
-    Transaction tx = txManager.startShort();
-    validateSorted(tx.getInProgress());
-    validateSorted(tx.getInvalids());
-    Assert.assertFalse(tx.isVisible(tx1.getWritePointer()));
-    Assert.assertFalse(tx.isVisible(tx2.getWritePointer()));
-    Assert.assertFalse(tx.isVisible(tx3.getWritePointer()));
-    Assert.assertFalse(tx.isVisible(tx1c.getWritePointer()));
-    Assert.assertFalse(tx.isVisible(tx2c.getWritePointer()));
-    Assert.assertFalse(tx.isVisible(tx3c.getWritePointer()));
-    txManager.abort(tx);
-
-    // abort one of the checkpoints
-    txManager.abort(tx1c);
-
-    // start new tx and validate all write pointers are excluded
-    tx = txManager.startShort();
-    validateSorted(tx.getInProgress());
-    validateSorted(tx.getInvalids());
-    Assert.assertFalse(tx.isVisible(tx2.getWritePointer()));
-    Assert.assertFalse(tx.isVisible(tx3.getWritePointer()));
-    Assert.assertFalse(tx.isVisible(tx2c.getWritePointer()));
-    Assert.assertFalse(tx.isVisible(tx3c.getWritePointer()));
-    txManager.abort(tx);
-
-    // invalidate one of the checkpoints
-    txManager.invalidate(tx2c.getTransactionId());
-
-    // start new tx and validate all write pointers are excluded
-    tx = txManager.startShort();
-    validateSorted(tx.getInProgress());
-    validateSorted(tx.getInvalids());
-    Assert.assertFalse(tx.isVisible(tx2.getWritePointer()));
-    Assert.assertFalse(tx.isVisible(tx3.getWritePointer()));
-    Assert.assertFalse(tx.isVisible(tx2c.getWritePointer()));
-    Assert.assertFalse(tx.isVisible(tx3c.getWritePointer()));
-    txManager.abort(tx);
-
-    // commit the last checkpoint
-    Assert.assertTrue(txManager.canCommit(tx3, Collections.<byte[]>emptyList()));
-    Assert.assertTrue(txManager.commit(tx3c));
-
-    // start new tx and validate all write pointers are excluded
-    tx = txManager.startShort();
-    validateSorted(tx.getInProgress());
-    validateSorted(tx.getInvalids());
-    Assert.assertFalse(tx.isVisible(tx2.getWritePointer()));
-    Assert.assertFalse(tx.isVisible(tx2c.getWritePointer()));
-    txManager.abort(tx);
-  }
-
-  private void validateSorted(long[] array) {
-    Long lastSeen = null;
-    for (long value : array) {
-      Assert.assertTrue(String.format("%s is not sorted", Arrays.toString(array)),
-                        lastSeen == null || lastSeen < value);
-      lastSeen = value;
-    }
+  @After
+  public void after() {
+    txManager.resetState();
   }
 
   @Test
   public void testTransactionCleanup() throws Exception {
-    conf.setInt(TxConstants.Manager.CFG_TX_CLEANUP_INTERVAL, 3);
-    conf.setInt(TxConstants.Manager.CFG_TX_TIMEOUT, 2);
+    Configuration config = new Configuration(conf);
+    config.setInt(TxConstants.Manager.CFG_TX_CLEANUP_INTERVAL, 3);
+    config.setInt(TxConstants.Manager.CFG_TX_TIMEOUT, 2);
     // using a new tx manager that cleans up
     TransactionManager txm = new TransactionManager
-      (conf, new InMemoryTransactionStateStorage(), new TxMetricsCollector());
+      (config, new InMemoryTransactionStateStorage(), new TxMetricsCollector());
     txm.startAndWait();
     try {
       Assert.assertEquals(0, txm.getInvalidSize());
@@ -250,11 +170,12 @@ public class TransactionManagerTest extends TransactionSystemTest {
 
   @Test
   public void testLongTransactionCleanup() throws Exception {
-    conf.setInt(TxConstants.Manager.CFG_TX_CLEANUP_INTERVAL, 3);
-    conf.setInt(TxConstants.Manager.CFG_TX_LONG_TIMEOUT, 2);
+    Configuration config = new Configuration(conf);
+    config.setInt(TxConstants.Manager.CFG_TX_CLEANUP_INTERVAL, 3);
+    config.setInt(TxConstants.Manager.CFG_TX_LONG_TIMEOUT, 2);
     // using a new tx manager that cleans up
     TransactionManager txm = new TransactionManager
-      (conf, new InMemoryTransactionStateStorage(), new TxMetricsCollector());
+      (config, new InMemoryTransactionStateStorage(), new TxMetricsCollector());
     txm.startAndWait();
     try {
       Assert.assertEquals(0, txm.getInvalidSize());

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae6ce2b5/tephra-core/src/test/java/org/apache/tephra/TransactionSystemTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/TransactionSystemTest.java b/tephra-core/src/test/java/org/apache/tephra/TransactionSystemTest.java
index 24798ca..5448052 100644
--- a/tephra-core/src/test/java/org/apache/tephra/TransactionSystemTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/TransactionSystemTest.java
@@ -19,17 +19,21 @@
 package org.apache.tephra;
 
 import com.google.common.collect.ImmutableSet;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.tephra.persist.TransactionSnapshot;
 import org.apache.tephra.persist.TransactionStateStorage;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 /**
- *
+ * Base class for testing implementations of {@link TransactionSystemClient}.
  */
 public abstract class TransactionSystemTest {
 
@@ -38,6 +42,20 @@ public abstract class TransactionSystemTest {
   private static final byte[] C3 = new byte[] { 'c', '3' };
   private static final byte[] C4 = new byte[] { 'c', '4' };
 
+  /**
+   * Sets up the common properties required for the test cases defined here.
+   * Subclasses can call this and add more properties as needed.
+   */
+  static Configuration getCommonConfiguration() {
+    Configuration conf = new Configuration();
+    conf.setInt(TxConstants.Manager.CFG_TX_MAX_TIMEOUT, (int) TimeUnit.DAYS.toSeconds(5)); // very long limit
+    conf.setInt(TxConstants.Manager.CFG_TX_CHANGESET_COUNT_LIMIT, 50);
+    conf.setInt(TxConstants.Manager.CFG_TX_CHANGESET_COUNT_WARN_THRESHOLD, 40);
+    conf.setInt(TxConstants.Manager.CFG_TX_CHANGESET_SIZE_LIMIT, 2048);
+    conf.setInt(TxConstants.Manager.CFG_TX_CHANGESET_SIZE_WARN_THRESHOLD, 1024);
+    return conf;
+  }
+
   protected abstract TransactionSystemClient getClient() throws Exception;
 
   protected abstract TransactionStateStorage getStateStorage() throws Exception;
@@ -63,6 +81,52 @@ public abstract class TransactionSystemTest {
   }
 
   @Test
+  public void testLargeChangeSet() throws Exception {
+    TransactionSystemClient client = getClient();
+    // first try with 50 changes (the max allowed)
+    List<byte[]> fiftyChanges = new ArrayList<>(51);
+    for (byte i = 0; i < 50; i++) {
+      fiftyChanges.add(new byte[] { i });
+    }
+    Transaction tx = client.startShort();
+    client.canCommitOrThrow(tx, fiftyChanges);
+    client.commit(tx);
+
+    // now try another transaction with 51 changes
+    fiftyChanges.add(new byte[] { 50 });
+    tx = client.startShort();
+    try {
+      client.canCommitOrThrow(tx, fiftyChanges);
+      Assert.fail("Expected " + TransactionSizeException.class.getName());
+    } catch (TransactionSizeException e) {
+      // expected
+    }
+    client.abort(tx);
+
+    // now try a change set that is just within the size limit
+    List<byte[]> changes2k = new ArrayList<>(51);
+    for (byte i = 0; i < 8; i++) {
+      byte[] change = new byte[256];
+      change[0] = i;
+      changes2k.add(change);
+    }
+    tx = client.startShort();
+    client.canCommitOrThrow(tx, changes2k);
+    client.commit(tx);
+
+    // now add another byte to the change set to exceed the limit
+    changes2k.add(new byte[] { 0 });
+    tx = client.startShort();
+    try {
+      client.canCommitOrThrow(tx, changes2k);
+      Assert.fail("Expected " + TransactionSizeException.class.getName());
+    } catch (TransactionSizeException e) {
+      // expected
+    }
+    client.abort(tx);
+  }
+
+  @Test
   public void testCommitRaceHandling() throws Exception {
     TransactionSystemClient client1 = getClient();
     TransactionSystemClient client2 = getClient();
@@ -70,9 +134,9 @@ public abstract class TransactionSystemTest {
     Transaction tx1 = client1.startShort();
     Transaction tx2 = client2.startShort();
 
-    Assert.assertTrue(client1.canCommit(tx1, asList(C1, C2)));
+    Assert.assertTrue(client1.canCommitOrThrow(tx1, asList(C1, C2)));
     // second one also can commit even thought there are conflicts with first since first one hasn't committed yet
-    Assert.assertTrue(client2.canCommit(tx2, asList(C2, C3)));
+    Assert.assertTrue(client2.canCommitOrThrow(tx2, asList(C2, C3)));
 
     Assert.assertTrue(client1.commit(tx1));
 
@@ -97,16 +161,16 @@ public abstract class TransactionSystemTest {
     Transaction tx4 = client4.startShort();
     Transaction tx5 = client5.startShort();
 
-    Assert.assertTrue(client1.canCommit(tx1, asList(C1)));
+    Assert.assertTrue(client1.canCommitOrThrow(tx1, asList(C1)));
     Assert.assertTrue(client1.commit(tx1));
 
-    Assert.assertTrue(client2.canCommit(tx2, asList(C2)));
+    Assert.assertTrue(client2.canCommitOrThrow(tx2, asList(C2)));
     Assert.assertTrue(client2.commit(tx2));
 
     // verifying conflicts detection
-    Assert.assertFalse(client3.canCommit(tx3, asList(C1)));
-    Assert.assertFalse(client4.canCommit(tx4, asList(C2)));
-    Assert.assertTrue(client5.canCommit(tx5, asList(C3)));
+    Assert.assertFalse(client3.canCommitOrThrow(tx3, asList(C1)));
+    Assert.assertFalse(client4.canCommitOrThrow(tx4, asList(C2)));
+    Assert.assertTrue(client5.canCommitOrThrow(tx5, asList(C3)));
   }
 
   @Test
@@ -114,7 +178,7 @@ public abstract class TransactionSystemTest {
     TransactionSystemClient client = getClient();
     Transaction tx = client.startShort();
 
-    Assert.assertTrue(client.canCommit(tx, asList(C1, C2)));
+    Assert.assertTrue(client.canCommitOrThrow(tx, asList(C1, C2)));
     Assert.assertTrue(client.commit(tx));
     // cannot commit twice same tx
     try {
@@ -130,7 +194,7 @@ public abstract class TransactionSystemTest {
     TransactionSystemClient client = getClient();
     Transaction tx = client.startShort();
 
-    Assert.assertTrue(client.canCommit(tx, asList(C1, C2)));
+    Assert.assertTrue(client.canCommitOrThrow(tx, asList(C1, C2)));
     client.abort(tx);
     // abort of not active tx has no affect
     client.abort(tx);
@@ -141,11 +205,11 @@ public abstract class TransactionSystemTest {
     TransactionSystemClient client = getClient();
     Transaction tx = client.startShort();
 
-    Assert.assertTrue(client.canCommit(tx, asList(C1, C2)));
+    Assert.assertTrue(client.canCommitOrThrow(tx, asList(C1, C2)));
     Assert.assertTrue(client.commit(tx));
     // can't re-use same tx again
     try {
-      client.canCommit(tx, asList(C3, C4));
+      client.canCommitOrThrow(tx, asList(C3, C4));
       Assert.fail();
     } catch (TransactionNotInProgressException e) {
       // expected
@@ -172,7 +236,7 @@ public abstract class TransactionSystemTest {
                                         new long[] {}, new long[] {}, Transaction.NO_TX_IN_PROGRESS, 
                                         TransactionType.SHORT);
     try {
-      Assert.assertFalse(client.canCommit(txOld, asList(C3, C4)));
+      Assert.assertFalse(client.canCommitOrThrow(txOld, asList(C3, C4)));
       Assert.fail();
     } catch (TransactionNotInProgressException e) {
       // expected
@@ -191,7 +255,7 @@ public abstract class TransactionSystemTest {
                                         new long[] {}, new long[] {}, Transaction.NO_TX_IN_PROGRESS, 
                                         TransactionType.SHORT);
     try {
-      Assert.assertFalse(client.canCommit(txNew, asList(C3, C4)));
+      Assert.assertFalse(client.canCommitOrThrow(txNew, asList(C3, C4)));
       Assert.fail();
     } catch (TransactionNotInProgressException e) {
       // expected
@@ -211,7 +275,7 @@ public abstract class TransactionSystemTest {
     TransactionSystemClient client = getClient();
     Transaction tx = client.startShort();
 
-    Assert.assertTrue(client.canCommit(tx, asList(C1, C2)));
+    Assert.assertTrue(client.canCommitOrThrow(tx, asList(C1, C2)));
     Assert.assertTrue(client.commit(tx));
     // abort of not active tx has no affect
     client.abort(tx);
@@ -223,11 +287,11 @@ public abstract class TransactionSystemTest {
     TransactionSystemClient client = getClient();
     // Invalidate an in-progress tx
     Transaction tx1 = client.startShort();
-    client.canCommit(tx1, asList(C1, C2));
+    client.canCommitOrThrow(tx1, asList(C1, C2));
     Assert.assertTrue(client.invalidate(tx1.getTransactionId()));
     // Cannot invalidate a committed tx
     Transaction tx2 = client.startShort();
-    client.canCommit(tx2, asList(C3, C4));
+    client.canCommitOrThrow(tx2, asList(C3, C4));
     client.commit(tx2);
     Assert.assertFalse(client.invalidate(tx2.getTransactionId()));
   }
@@ -241,9 +305,9 @@ public abstract class TransactionSystemTest {
 
     Transaction tx1 = client.startShort();
     Transaction tx2 = client.startShort();
-    client.canCommit(tx1, asList(C1, C2));
+    client.canCommitOrThrow(tx1, asList(C1, C2));
     client.commit(tx1);
-    client.canCommit(tx2, asList(C3, C4));
+    client.canCommitOrThrow(tx2, asList(C3, C4));
 
     Transaction txPreReset = client.startShort();
     long currentTs = System.currentTimeMillis();
@@ -334,6 +398,93 @@ public abstract class TransactionSystemTest {
     Assert.assertEquals(3, client.getInvalidSize());
   }
 
+  @Test
+  public void testCheckpointing() throws Exception {
+    TransactionSystemClient client = getClient();
+    // create a few transactions
+    Transaction tx1 = client.startShort();
+    Transaction tx2 = client.startShort();
+    Transaction tx3 = client.startShort();
+
+    // start and commit a few
+    for (int i = 0; i < 5; i++) {
+      Transaction tx = client.startShort();
+      Assert.assertTrue(client.canCommit(tx, Collections.singleton(new byte[] { (byte) i })));
+      Assert.assertTrue(client.commit(tx));
+    }
+
+    // checkpoint the transactions
+    Transaction tx3c = client.checkpoint(tx3);
+    Transaction tx2c = client.checkpoint(tx2);
+    Transaction tx1c = client.checkpoint(tx1);
+
+    // start and commit a few (this moves the read pointer past all checkpoint write versions)
+    for (int i = 5; i < 10; i++) {
+      Transaction tx = client.startShort();
+      Assert.assertTrue(client.canCommit(tx, Collections.singleton(new byte[] { (byte) i })));
+      Assert.assertTrue(client.commit(tx));
+    }
+
+    // start new tx and validate all write pointers are excluded
+    Transaction tx = client.startShort();
+    validateSorted(tx.getInProgress());
+    validateSorted(tx.getInvalids());
+    Assert.assertFalse(tx.isVisible(tx1.getWritePointer()));
+    Assert.assertFalse(tx.isVisible(tx2.getWritePointer()));
+    Assert.assertFalse(tx.isVisible(tx3.getWritePointer()));
+    Assert.assertFalse(tx.isVisible(tx1c.getWritePointer()));
+    Assert.assertFalse(tx.isVisible(tx2c.getWritePointer()));
+    Assert.assertFalse(tx.isVisible(tx3c.getWritePointer()));
+    client.abort(tx);
+
+    // abort one of the checkpoints
+    client.abort(tx1c);
+
+    // start new tx and validate all write pointers are excluded
+    tx = client.startShort();
+    validateSorted(tx.getInProgress());
+    validateSorted(tx.getInvalids());
+    Assert.assertFalse(tx.isVisible(tx2.getWritePointer()));
+    Assert.assertFalse(tx.isVisible(tx3.getWritePointer()));
+    Assert.assertFalse(tx.isVisible(tx2c.getWritePointer()));
+    Assert.assertFalse(tx.isVisible(tx3c.getWritePointer()));
+    client.abort(tx);
+
+    // invalidate one of the checkpoints
+    client.invalidate(tx2c.getTransactionId());
+
+    // start new tx and validate all write pointers are excluded
+    tx = client.startShort();
+    validateSorted(tx.getInProgress());
+    validateSorted(tx.getInvalids());
+    Assert.assertFalse(tx.isVisible(tx2.getWritePointer()));
+    Assert.assertFalse(tx.isVisible(tx3.getWritePointer()));
+    Assert.assertFalse(tx.isVisible(tx2c.getWritePointer()));
+    Assert.assertFalse(tx.isVisible(tx3c.getWritePointer()));
+    client.abort(tx);
+
+    // commit the last checkpoint
+    Assert.assertTrue(client.canCommit(tx3, Collections.<byte[]>emptyList()));
+    Assert.assertTrue(client.commit(tx3c));
+
+    // start new tx and validate all write pointers are excluded
+    tx = client.startShort();
+    validateSorted(tx.getInProgress());
+    validateSorted(tx.getInvalids());
+    Assert.assertFalse(tx.isVisible(tx2.getWritePointer()));
+    Assert.assertFalse(tx.isVisible(tx2c.getWritePointer()));
+    client.abort(tx);
+  }
+
+  private void validateSorted(long[] array) {
+    Long lastSeen = null;
+    for (long value : array) {
+      Assert.assertTrue(String.format("%s is not sorted", Arrays.toString(array)),
+                        lastSeen == null || lastSeen < value);
+      lastSeen = value;
+    }
+  }
+
   private Collection<byte[]> asList(byte[]... val) {
     return Arrays.asList(val);
   }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae6ce2b5/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java b/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java
index 18f81c8..9c565ba 100644
--- a/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java
@@ -20,8 +20,6 @@ package org.apache.tephra.snapshot;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSortedMap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Ordering;
@@ -31,8 +29,8 @@ import com.google.inject.Injector;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tephra.ChangeId;
 import org.apache.tephra.Transaction;
+import org.apache.tephra.TransactionFailureException;
 import org.apache.tephra.TransactionManager;
-import org.apache.tephra.TransactionNotInProgressException;
 import org.apache.tephra.TxConstants;
 import org.apache.tephra.persist.TransactionSnapshot;
 import org.apache.tephra.persist.TransactionStateStorage;
@@ -49,7 +47,6 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
@@ -290,7 +287,7 @@ public class SnapshotCodecTest {
   }
 
   @Test
-  public void testSnapshotCodecV4() throws IOException, TransactionNotInProgressException {
+  public void testSnapshotCodecV4() throws IOException, TransactionFailureException {
     File testDir = tmpDir.newFolder("testSnapshotCodecV4");
     Configuration conf = new Configuration();
     conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, SnapshotCodecV4.class.getName());


[2/2] incubator-tephra git commit: (TEPHRA-241) Add a way to limit the size of a transaction

Posted by an...@apache.org.
(TEPHRA-241) Add a way to limit the size of a transaction

This closes #48

Signed-off-by: anew <an...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/ae6ce2b5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/ae6ce2b5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/ae6ce2b5

Branch: refs/heads/master
Commit: ae6ce2b5e83eef3ef50f7b025b2ff666d539e391
Parents: 8532076
Author: anew <an...@apache.org>
Authored: Thu Aug 31 14:43:32 2017 -0700
Committer: anew <an...@apache.org>
Committed: Sat Sep 9 00:11:47 2017 -0700

----------------------------------------------------------------------
 .../apache/tephra/TransactionSizeException.java |   28 +
 .../org/apache/tephra/TransactionContext.java   |    9 +-
 .../org/apache/tephra/TransactionManager.java   |   72 +-
 .../apache/tephra/TransactionSystemClient.java  |   22 +
 .../java/org/apache/tephra/TxConstants.java     |   17 +
 .../distributed/TransactionServiceClient.java   |   63 +-
 .../TransactionServiceThriftClient.java         |   22 +-
 .../TransactionServiceThriftHandler.java        |   21 +
 .../distributed/thrift/TTransactionServer.java  | 1298 +++++++++++++++++-
 .../tephra/inmemory/DetachedTxSystemClient.java |    5 +
 .../tephra/inmemory/InMemoryTxSystemClient.java |   11 +
 .../tephra/inmemory/MinimalTxSystemClient.java  |    5 +
 tephra-core/src/main/thrift/transaction.thrift  |    2 +
 .../tephra/ThriftTransactionSystemTest.java     |    4 +-
 .../apache/tephra/TransactionContextTest.java   |    5 +
 .../apache/tephra/TransactionExecutorTest.java  |    5 +
 .../apache/tephra/TransactionManagerTest.java   |  121 +-
 .../apache/tephra/TransactionSystemTest.java    |  189 ++-
 .../tephra/snapshot/SnapshotCodecTest.java      |    7 +-
 19 files changed, 1702 insertions(+), 204 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae6ce2b5/tephra-api/src/main/java/org/apache/tephra/TransactionSizeException.java
----------------------------------------------------------------------
diff --git a/tephra-api/src/main/java/org/apache/tephra/TransactionSizeException.java b/tephra-api/src/main/java/org/apache/tephra/TransactionSizeException.java
new file mode 100644
index 0000000..3ea040f
--- /dev/null
+++ b/tephra-api/src/main/java/org/apache/tephra/TransactionSizeException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra;
+
+/**
+ * Thrown to indicate that a transaction's change set exceeds the allowed size.
+ */
+public class TransactionSizeException extends TransactionFailureException {
+  public TransactionSizeException(String message) {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae6ce2b5/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java b/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java
index 0806294..8b4e4fd 100644
--- a/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java
+++ b/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java
@@ -270,15 +270,12 @@ public class TransactionContext {
 
     boolean canCommit = false;
     try {
-      canCommit = txClient.canCommit(currentTx, changes);
-    } catch (TransactionNotInProgressException e) {
-      String message = String.format("Transaction %d is not in progress.", currentTx.getTransactionId());
-      LOG.warn(message, e);
-      abort(new TransactionFailureException(message, e));
+      canCommit = txClient.canCommitOrThrow(currentTx, changes);
+    } catch (TransactionNotInProgressException | TransactionSizeException e) {
+      throw e;
       // abort will throw that exception
     } catch (Throwable e) {
       String message = String.format("Exception from canCommit for transaction %d.", currentTx.getTransactionId());
-      LOG.warn(message, e);
       abort(new TransactionFailureException(message, e));
       // abort will throw that exception
     }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae6ce2b5/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java b/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java
index 3f332ad..4479812 100644
--- a/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java
+++ b/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java
@@ -162,6 +162,11 @@ public class TransactionManager extends AbstractService {
   private final Lock logReadLock = logLock.readLock();
   private final Lock logWriteLock = logLock.writeLock();
 
+  private final int changeSetCountLimit;
+  private final int changeSetCountThreshold;
+  private final long changeSetSizeLimit;
+  private final long changeSetSizeThreshold;
+
   // fudge factor (in milliseconds) used when interpreting transactions as LONG based on expiration
   // TODO: REMOVE WITH txnBackwardsCompatCheck()
   private final long longTimeoutTolerance;
@@ -188,6 +193,15 @@ public class TransactionManager extends AbstractService {
     snapshotRetainCount = Math.max(conf.getInt(TxConstants.Manager.CFG_TX_SNAPSHOT_RETAIN,
                                                TxConstants.Manager.DEFAULT_TX_SNAPSHOT_RETAIN), 1);
 
+    changeSetCountLimit = conf.getInt(TxConstants.Manager.CFG_TX_CHANGESET_COUNT_LIMIT,
+                                      TxConstants.Manager.DEFAULT_TX_CHANGESET_COUNT_LIMIT);
+    changeSetCountThreshold = conf.getInt(TxConstants.Manager.CFG_TX_CHANGESET_COUNT_WARN_THRESHOLD,
+                                          TxConstants.Manager.DEFAULT_TX_CHANGESET_COUNT_WARN_THRESHOLD);
+    changeSetSizeLimit = conf.getLong(TxConstants.Manager.CFG_TX_CHANGESET_SIZE_LIMIT,
+                                      TxConstants.Manager.DEFAULT_TX_CHANGESET_SIZE_LIMIT);
+    changeSetSizeThreshold = conf.getLong(TxConstants.Manager.CFG_TX_CHANGESET_SIZE_WARN_THRESHOLD,
+                                          TxConstants.Manager.DEFAULT_TX_CHANGESET_SIZE_WARN_THRESHOLD);
+
     // intentionally not using a constant, as this config should not be exposed
     // TODO: REMOVE WITH txnBackwardsCompatCheck()
     longTimeoutTolerance = conf.getLong("data.tx.long.timeout.tolerance", 10000);
@@ -839,10 +853,13 @@ public class TransactionManager extends AbstractService {
     }
   }
 
-  public boolean canCommit(Transaction tx, Collection<byte[]> changeIds) throws TransactionNotInProgressException {
+  public boolean canCommit(Transaction tx, Collection<byte[]> changeIds)
+    throws TransactionNotInProgressException, TransactionSizeException {
+
     txMetricsCollector.rate("canCommit");
     Stopwatch timer = new Stopwatch().start();
-    if (inProgress.get(tx.getTransactionId()) == null) {
+    InProgressTx inProgressTx = inProgress.get(tx.getTransactionId());
+    if (inProgressTx == null) {
       synchronized (this) {
         // invalid transaction, either this has timed out and moved to invalid, or something else is wrong.
         if (invalidTxList.contains(tx.getTransactionId())) {
@@ -857,10 +874,8 @@ public class TransactionManager extends AbstractService {
       }
     }
 
-    Set<ChangeId> set = Sets.newHashSetWithExpectedSize(changeIds.size());
-    for (byte[] change : changeIds) {
-      set.add(new ChangeId(change));
-    }
+    Set<ChangeId> set =
+      validateChangeSet(tx, changeIds, inProgressTx.clientId != null ? inProgressTx.clientId : DEFAULT_CLIENTID);
 
     if (hasConflicts(tx, set)) {
       return false;
@@ -880,6 +895,51 @@ public class TransactionManager extends AbstractService {
     return true;
   }
 
+  /**
+   * Validate the number of changes and the total size of changes. Log a warning if either of them exceeds the
+   * configured threshold, or log a warning and throw an exception if it exceeds the configured limit.
+   *
+   * We log here because application developers may ignore warnings. Logging here gives us a single point
+   * (the tx manager log) to identify all clients that send excessively large change sets.
+   *
+   * @return the same set of changes, transformed into a set of {@link ChangeId}s.
+   * @throws TransactionSizeException if the number or total size of the changes exceed the limit.
+   */
+  private Set<ChangeId> validateChangeSet(Transaction tx, Collection<byte[]> changeIds,
+                                          String clientId) throws TransactionSizeException {
+    if (changeIds.size() > changeSetCountLimit) {
+      LOG.warn("Change set for transaction {} belonging to client '{}' has {} entries and exceeds " +
+                 "the allowed size of {}. Limit the number of changes, or use a long-running transaction. ",
+               tx.getTransactionId(), clientId, changeIds.size(), changeSetCountLimit);
+      throw new TransactionSizeException(String.format(
+        "Change set for transaction %d has %d entries and exceeds the limit of %d",
+        tx.getTransactionId(), changeIds.size(), changeSetCountLimit));
+    } else if (changeIds.size() > changeSetCountThreshold) {
+      LOG.warn("Change set for transaction {} belonging to client '{}' has {} entries. " +
+                 "It is recommended to limit the number of changes to {}, or to use a long-running transaction. ",
+               tx.getTransactionId(), clientId, changeIds.size(), changeSetCountThreshold);
+    }
+    long byteCount = 0L;
+    Set<ChangeId> set = Sets.newHashSetWithExpectedSize(changeIds.size());
+    for (byte[] change : changeIds) {
+      set.add(new ChangeId(change));
+      byteCount += change.length;
+    }
+    if (byteCount > changeSetSizeLimit) {
+      LOG.warn("Change set for transaction {} belonging to client '{}' has total size of {} bytes and exceeds " +
+                 "the allowed size of {} bytes. Limit the total size of changes, or use a long-running transaction. ",
+               tx.getTransactionId(), clientId, byteCount, changeSetSizeLimit);
+      throw new TransactionSizeException(String.format(
+        "Change set for transaction %d has total size of %d bytes and exceeds the limit of %d bytes",
+        tx.getTransactionId(), byteCount, changeSetSizeLimit));
+    } else if (byteCount > changeSetSizeThreshold) {
+      LOG.warn("Change set for transaction {} belonging to client '{}' has total size of {} bytes. " +
+                 "It is recommended to limit the total size to {} bytes, or to use a long-running transaction. ",
+               tx.getTransactionId(), clientId, byteCount, changeSetSizeThreshold);
+    }
+    return set;
+  }
+
   private void addCommittingChangeSet(long writePointer, Set<ChangeId> changes) {
     committingChangeSets.put(writePointer, changes);
   }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae6ce2b5/tephra-core/src/main/java/org/apache/tephra/TransactionSystemClient.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/TransactionSystemClient.java b/tephra-core/src/main/java/org/apache/tephra/TransactionSystemClient.java
index 9702c61..a44f131 100644
--- a/tephra-core/src/main/java/org/apache/tephra/TransactionSystemClient.java
+++ b/tephra-core/src/main/java/org/apache/tephra/TransactionSystemClient.java
@@ -70,10 +70,32 @@ public interface TransactionSystemClient {
    * @param tx transaction to verify
    * @param changeIds ids of changes made by transaction
    * @return true if transaction can be committed otherwise false
+   * @throws TransactionNotInProgressException if the transaction is not in progress; most likely it has timed out.
+   *
+   * @deprecated since 0.13-incubating; use {@link #canCommitOrThrow(Transaction, Collection)} instead
    */
+  @Deprecated
   boolean canCommit(Transaction tx, Collection<byte[]> changeIds) throws TransactionNotInProgressException;
 
   /**
+   * Checks if transaction with the set of changes can be committed. E.g. it can check conflicts with other changes and
+   * refuse commit if there are conflicts. It is assumed that no other changes will be done in between this method call
+   * and {@link #commit(Transaction)} which may check conflicts again to avoid races.
+   * <p/>
+   * Since we do conflict detection at commit time as well, this may seem redundant. The idea is to check for conflicts
+   * before we persist changes to avoid rollback in case of conflicts as much as possible.
+   * NOTE: in some situations we may want to skip this step to save on RPC with a risk of many rollback ops. So by
+   *       default we take safe path.
+   *
+   * @param tx transaction to verify
+   * @param changeIds ids of changes made by transaction
+   * @return true if transaction can be committed otherwise false
+   * @throws TransactionSizeException if the size of the chgange set exceeds the allowed limit
+   * @throws TransactionNotInProgressException if the transaction is not in progress; most likely it has timed out.
+   */
+  boolean canCommitOrThrow(Transaction tx, Collection<byte[]> changeIds) throws TransactionFailureException;
+
+  /**
    * Makes transaction visible. It will again check conflicts of changes submitted previously with
    * {@link #canCommit(Transaction, java.util.Collection)}
    * @param tx transaction to make visible.

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae6ce2b5/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/TxConstants.java b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
index 1dbd3cb..5c78aa4 100644
--- a/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
+++ b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
@@ -179,6 +179,23 @@ public class TxConstants {
     public static final String CFG_TX_SNAPSHOT_RETAIN = "data.tx.snapshot.retain";
     /** Default value for number of most recent snapshots to retain. */
     public static final int DEFAULT_TX_SNAPSHOT_RETAIN = 10;
+
+    /** The limit for the number of entries in a change set. If exceeded, the transaction fails. */
+    public static final String CFG_TX_CHANGESET_COUNT_LIMIT = "data.tx.changeset.count.limit";
+    /** The warning threshold for the number of entries in a change set. If exceeded, a warning is logged. */
+    public static final String CFG_TX_CHANGESET_COUNT_WARN_THRESHOLD = "data.tx.changeset.count.warn.threshold";
+    /** The limit for the total size in bytes of a change set. If exceeded, the transaction fails. */
+    public static final String CFG_TX_CHANGESET_SIZE_LIMIT = "data.tx.changeset.size.limit";
+    /** The warning threshold for the total size in bytes of a change set. If exceeded, a warning is logged. */
+    public static final String CFG_TX_CHANGESET_SIZE_WARN_THRESHOLD = "data.tx.changeset.size.warn.threshold";
+    /** The default limit for the number of entries in a change set is unlimited. */
+    public static final int DEFAULT_TX_CHANGESET_COUNT_LIMIT = Integer.MAX_VALUE;
+    /** The default warning threshold for the number of entries in a change set is unlimited. */
+    public static final int DEFAULT_TX_CHANGESET_COUNT_WARN_THRESHOLD = Integer.MAX_VALUE;
+    /** The default limit for the total size in bytes of a change set is unlimited. */
+    public static final long DEFAULT_TX_CHANGESET_SIZE_LIMIT = Long.MAX_VALUE;
+    /** The default warning threshold for the total size in bytes of a change set is unlimited. */
+    public static final long DEFAULT_TX_CHANGESET_SIZE_WARN_THRESHOLD = Long.MAX_VALUE;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae6ce2b5/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceClient.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceClient.java b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceClient.java
index cdcca7f..f1743de 100644
--- a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceClient.java
+++ b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceClient.java
@@ -28,7 +28,9 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.tephra.InvalidTruncateTimeException;
 import org.apache.tephra.Transaction;
 import org.apache.tephra.TransactionCouldNotTakeSnapshotException;
+import org.apache.tephra.TransactionFailureException;
 import org.apache.tephra.TransactionNotInProgressException;
+import org.apache.tephra.TransactionSizeException;
 import org.apache.tephra.TransactionSystemClient;
 import org.apache.tephra.TxConstants;
 import org.apache.tephra.runtime.ConfigModule;
@@ -65,6 +67,11 @@ public class TransactionServiceClient implements TransactionSystemClient {
   // client id that is used to identify the transactions
   private final String clientId;
 
+  private final int changeSetCountLimit;
+  private final int changeSetCountThreshold;
+  private final long changeSetSizeLimit;
+  private final long changeSetSizeThreshold;
+
   /**
    * Utility to be used for basic verification of transaction system availability and functioning
    * @param args arguments list, accepts single option "-v" that makes it to print out more details about started tx
@@ -109,7 +116,7 @@ public class TransactionServiceClient implements TransactionSystemClient {
                    ", inProgress: " + tx.getInProgress().length);
       }
       LOG.info("Checking if canCommit tx...");
-      boolean canCommit = client.canCommit(tx, Collections.<byte[]>emptyList());
+      boolean canCommit = client.canCommitOrThrow(tx, Collections.<byte[]>emptyList());
       LOG.info("canCommit: " + canCommit);
       if (canCommit) {
         LOG.info("Committing tx...");
@@ -171,6 +178,15 @@ public class TransactionServiceClient implements TransactionSystemClient {
 
     this.clientProvider = clientProvider;
     this.clientId = clientId;
+
+    changeSetCountLimit = config.getInt(TxConstants.Manager.CFG_TX_CHANGESET_COUNT_LIMIT,
+                                        TxConstants.Manager.DEFAULT_TX_CHANGESET_COUNT_LIMIT);
+    changeSetCountThreshold = config.getInt(TxConstants.Manager.CFG_TX_CHANGESET_COUNT_WARN_THRESHOLD,
+                                            TxConstants.Manager.DEFAULT_TX_CHANGESET_COUNT_WARN_THRESHOLD);
+    changeSetSizeLimit = config.getLong(TxConstants.Manager.CFG_TX_CHANGESET_SIZE_LIMIT,
+                                        TxConstants.Manager.DEFAULT_TX_CHANGESET_SIZE_LIMIT);
+    changeSetSizeThreshold = config.getLong(TxConstants.Manager.CFG_TX_CHANGESET_SIZE_WARN_THRESHOLD,
+                                            TxConstants.Manager.DEFAULT_TX_CHANGESET_SIZE_WARN_THRESHOLD);
   }
 
   /**
@@ -324,6 +340,51 @@ public class TransactionServiceClient implements TransactionSystemClient {
   }
 
   @Override
+  public boolean canCommitOrThrow(final Transaction tx, final Collection<byte[]> changeIds)
+    throws TransactionFailureException {
+
+    // we want to validate the size of the change set here before sending it over the wire.
+    // if the change set is large, it can cause memory issues on the server side.
+    if (changeIds.size() > changeSetCountLimit) {
+      throw new TransactionSizeException(String.format(
+        "Change set for transaction %d has %d entries and exceeds the limit of %d",
+        tx.getTransactionId(), changeIds.size(), changeSetCountLimit));
+    } else if (changeIds.size() > changeSetCountThreshold) {
+      LOG.warn("Change set for transaction {} has {} entries. " +
+                 "It is recommended to limit the number of changes to {}, or to use a long-running transaction. ",
+               tx.getTransactionId(), changeIds.size(), changeSetCountThreshold);
+    }
+    long byteCount = 0L;
+    for (byte[] change : changeIds) {
+      byteCount += change.length;
+    }
+    if (byteCount > changeSetSizeLimit) {
+      throw new TransactionSizeException(String.format(
+        "Change set for transaction %d has total size of %d bytes and exceeds the limit of %d bytes",
+        tx.getTransactionId(), byteCount, changeSetSizeLimit));
+    } else if (byteCount > changeSetSizeThreshold) {
+      LOG.warn("Change set for transaction {} has total size of {} bytes. " +
+                 "It is recommended to limit the total size to {} bytes, or to use a long-running transaction. ",
+               tx.getTransactionId(), byteCount, changeSetSizeThreshold);
+    }
+
+    try {
+      return execute(
+        new Operation<Boolean>("canCommit") {
+          @Override
+          public Boolean execute(TransactionServiceThriftClient client)
+            throws Exception {
+            return client.canCommitOrThrow(tx, changeIds);
+          }
+        });
+    } catch (TransactionNotInProgressException | TransactionSizeException e) {
+      throw e;
+    } catch (Exception e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  @Override
   public boolean commit(final Transaction tx) throws TransactionNotInProgressException {
     try {
       return this.execute(

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae6ce2b5/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftClient.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftClient.java b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftClient.java
index ccd266a..ba37243 100644
--- a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftClient.java
+++ b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftClient.java
@@ -25,6 +25,7 @@ import org.apache.tephra.InvalidTruncateTimeException;
 import org.apache.tephra.Transaction;
 import org.apache.tephra.TransactionCouldNotTakeSnapshotException;
 import org.apache.tephra.TransactionNotInProgressException;
+import org.apache.tephra.TransactionSizeException;
 import org.apache.tephra.distributed.thrift.TGenericException;
 import org.apache.tephra.distributed.thrift.TInvalidTruncateTimeException;
 import org.apache.tephra.distributed.thrift.TTransactionCouldNotTakeSnapshotException;
@@ -196,7 +197,26 @@ public class TransactionServiceThriftClient {
     }
   }
 
-
+  public boolean canCommitOrThrow(Transaction tx, Collection<byte[]> changeIds)
+    throws TException, TransactionNotInProgressException, TransactionSizeException {
+    try {
+      return client.canCommitTx(TransactionConverterUtils.wrap(tx),
+                                ImmutableSet.copyOf(Iterables.transform(changeIds, BYTES_WRAPPER))).isValue();
+    } catch (TTransactionNotInProgressException e) {
+      throw new TransactionNotInProgressException(e.getMessage());
+    } catch (TGenericException e) {
+      // currently, we only expect TransactionSizeException here
+      if (!TransactionSizeException.class.getName().equals(e.getOriginalExceptionClass())) {
+        LOG.trace("Expecting only {} as the original exception class but found {}",
+                  TransactionSizeException.class.getName(), e.getOriginalExceptionClass());
+        throw e;
+      }
+      throw new TransactionSizeException(e.getMessage());
+    } catch (TException e) {
+      isValid.set(false);
+      throw e;
+    }
+  }
 
   public boolean commit(Transaction tx) throws TException, TransactionNotInProgressException {
     try {

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae6ce2b5/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftHandler.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftHandler.java b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftHandler.java
index 174b463..0c9105b 100644
--- a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftHandler.java
+++ b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftHandler.java
@@ -22,6 +22,7 @@ import com.google.common.collect.Sets;
 import org.apache.tephra.InvalidTruncateTimeException;
 import org.apache.tephra.TransactionManager;
 import org.apache.tephra.TransactionNotInProgressException;
+import org.apache.tephra.TransactionSizeException;
 import org.apache.tephra.TxConstants;
 import org.apache.tephra.distributed.thrift.TBoolean;
 import org.apache.tephra.distributed.thrift.TGenericException;
@@ -129,6 +130,26 @@ public class TransactionServiceThriftHandler implements TTransactionServer.Iface
       return new TBoolean(txManager.canCommit(TransactionConverterUtils.unwrap(tx), changeIds));
     } catch (TransactionNotInProgressException e) {
       throw new TTransactionNotInProgressException(e.getMessage());
+    } catch (TransactionSizeException e) {
+      return new TBoolean(false); // can't throw exception -> just indicate that it failed
+    }
+  }
+
+  @Override
+  public TBoolean canCommitOrThrow(TTransaction tx, Set<ByteBuffer> changes) throws TException {
+
+    Set<byte[]> changeIds = Sets.newHashSet();
+    for (ByteBuffer bb : changes) {
+      byte[] changeId = new byte[bb.remaining()];
+      bb.get(changeId);
+      changeIds.add(changeId);
+    }
+    try {
+      return new TBoolean(txManager.canCommit(TransactionConverterUtils.unwrap(tx), changeIds));
+    } catch (TransactionNotInProgressException e) {
+      throw new TTransactionNotInProgressException(e.getMessage());
+    } catch (TransactionSizeException e) {
+      throw new TGenericException(e.getMessage(), TransactionSizeException.class.getName());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae6ce2b5/tephra-core/src/main/java/org/apache/tephra/distributed/thrift/TTransactionServer.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/thrift/TTransactionServer.java b/tephra-core/src/main/java/org/apache/tephra/distributed/thrift/TTransactionServer.java
index 6c99bb4..6c07ccb 100644
--- a/tephra-core/src/main/java/org/apache/tephra/distributed/thrift/TTransactionServer.java
+++ b/tephra-core/src/main/java/org/apache/tephra/distributed/thrift/TTransactionServer.java
@@ -68,6 +68,8 @@ public class TTransactionServer {
 
     public TBoolean canCommitTx(TTransaction tx, Set<ByteBuffer> changes) throws TTransactionNotInProgressException, org.apache.thrift.TException;
 
+    public TBoolean canCommitOrThrow(TTransaction tx, Set<ByteBuffer> changes) throws TTransactionNotInProgressException, TGenericException, org.apache.thrift.TException;
+
     public TBoolean commitTx(TTransaction tx) throws TTransactionNotInProgressException, org.apache.thrift.TException;
 
     public void abortTx(TTransaction tx) throws org.apache.thrift.TException;
@@ -110,6 +112,8 @@ public class TTransactionServer {
 
     public void canCommitTx(TTransaction tx, Set<ByteBuffer> changes, org.apache.thrift.async.AsyncMethodCallback<AsyncClient.canCommitTx_call> resultHandler) throws org.apache.thrift.TException;
 
+    public void canCommitOrThrow(TTransaction tx, Set<ByteBuffer> changes, org.apache.thrift.async.AsyncMethodCallback<AsyncClient.canCommitOrThrow_call> resultHandler) throws org.apache.thrift.TException;
+
     public void commitTx(TTransaction tx, org.apache.thrift.async.AsyncMethodCallback<AsyncClient.commitTx_call> resultHandler) throws org.apache.thrift.TException;
 
     public void abortTx(TTransaction tx, org.apache.thrift.async.AsyncMethodCallback<AsyncClient.abortTx_call> resultHandler) throws org.apache.thrift.TException;
@@ -353,6 +357,36 @@ public class TTransactionServer {
       throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "canCommitTx failed: unknown result");
     }
 
+    public TBoolean canCommitOrThrow(TTransaction tx, Set<ByteBuffer> changes) throws TTransactionNotInProgressException, TGenericException, org.apache.thrift.TException
+    {
+      send_canCommitOrThrow(tx, changes);
+      return recv_canCommitOrThrow();
+    }
+
+    public void send_canCommitOrThrow(TTransaction tx, Set<ByteBuffer> changes) throws org.apache.thrift.TException
+    {
+      canCommitOrThrow_args args = new canCommitOrThrow_args();
+      args.setTx(tx);
+      args.setChanges(changes);
+      sendBase("canCommitOrThrow", args);
+    }
+
+    public TBoolean recv_canCommitOrThrow() throws TTransactionNotInProgressException, TGenericException, org.apache.thrift.TException
+    {
+      canCommitOrThrow_result result = new canCommitOrThrow_result();
+      receiveBase(result, "canCommitOrThrow");
+      if (result.isSetSuccess()) {
+        return result.success;
+      }
+      if (result.e != null) {
+        throw result.e;
+      }
+      if (result.g != null) {
+        throw result.g;
+      }
+      throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "canCommitOrThrow failed: unknown result");
+    }
+
     public TBoolean commitTx(TTransaction tx) throws TTransactionNotInProgressException, org.apache.thrift.TException
     {
       send_commitTx(tx);
@@ -878,6 +912,41 @@ public class TTransactionServer {
       }
     }
 
+    public void canCommitOrThrow(TTransaction tx, Set<ByteBuffer> changes, org.apache.thrift.async.AsyncMethodCallback<canCommitOrThrow_call> resultHandler) throws org.apache.thrift.TException {
+      checkReady();
+      canCommitOrThrow_call method_call = new canCommitOrThrow_call(tx, changes, resultHandler, this, ___protocolFactory, ___transport);
+      this.___currentMethod = method_call;
+      ___manager.call(method_call);
+    }
+
+    public static class canCommitOrThrow_call extends org.apache.thrift.async.TAsyncMethodCall {
+      private TTransaction tx;
+      private Set<ByteBuffer> changes;
+      public canCommitOrThrow_call(TTransaction tx, Set<ByteBuffer> changes, org.apache.thrift.async.AsyncMethodCallback<canCommitOrThrow_call> resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
+        super(client, protocolFactory, transport, resultHandler, false);
+        this.tx = tx;
+        this.changes = changes;
+      }
+
+      public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException {
+        prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("canCommitOrThrow", org.apache.thrift.protocol.TMessageType.CALL, 0));
+        canCommitOrThrow_args args = new canCommitOrThrow_args();
+        args.setTx(tx);
+        args.setChanges(changes);
+        args.write(prot);
+        prot.writeMessageEnd();
+      }
+
+      public TBoolean getResult() throws TTransactionNotInProgressException, TGenericException, org.apache.thrift.TException {
+        if (getState() != org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) {
+          throw new IllegalStateException("Method call not finished!");
+        }
+        org.apache.thrift.transport.TMemoryInputTransport memoryTransport = new org.apache.thrift.transport.TMemoryInputTransport(getFrameBuffer().array());
+        org.apache.thrift.protocol.TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport);
+        return (new Client(prot)).recv_canCommitOrThrow();
+      }
+    }
+
     public void commitTx(TTransaction tx, org.apache.thrift.async.AsyncMethodCallback<commitTx_call> resultHandler) throws org.apache.thrift.TException {
       checkReady();
       commitTx_call method_call = new commitTx_call(tx, resultHandler, this, ___protocolFactory, ___transport);
@@ -1236,6 +1305,7 @@ public class TTransactionServer {
       processMap.put("startShortWithClientIdAndTimeOut", new startShortWithClientIdAndTimeOut());
       processMap.put("startShortWithTimeout", new startShortWithTimeout());
       processMap.put("canCommitTx", new canCommitTx());
+      processMap.put("canCommitOrThrow", new canCommitOrThrow());
       processMap.put("commitTx", new commitTx());
       processMap.put("abortTx", new abortTx());
       processMap.put("invalidateTx", new invalidateTx());
@@ -1430,6 +1500,32 @@ public class TTransactionServer {
       }
     }
 
+    public static class canCommitOrThrow<I extends Iface> extends org.apache.thrift.ProcessFunction<I, canCommitOrThrow_args> {
+      public canCommitOrThrow() {
+        super("canCommitOrThrow");
+      }
+
+      public canCommitOrThrow_args getEmptyArgsInstance() {
+        return new canCommitOrThrow_args();
+      }
+
+      protected boolean isOneway() {
+        return false;
+      }
+
+      public canCommitOrThrow_result getResult(I iface, canCommitOrThrow_args args) throws org.apache.thrift.TException {
+        canCommitOrThrow_result result = new canCommitOrThrow_result();
+        try {
+          result.success = iface.canCommitOrThrow(args.tx, args.changes);
+        } catch (TTransactionNotInProgressException e) {
+          result.e = e;
+        } catch (TGenericException g) {
+          result.g = g;
+        }
+        return result;
+      }
+    }
+
     public static class commitTx<I extends Iface> extends org.apache.thrift.ProcessFunction<I, commitTx_args> {
       public commitTx() {
         super("commitTx");
@@ -7921,22 +8017,25 @@ public class TTransactionServer {
 
   }
 
-  public static class commitTx_args implements org.apache.thrift.TBase<commitTx_args, commitTx_args._Fields>, java.io.Serializable, Cloneable   {
-    private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("commitTx_args");
+  public static class canCommitOrThrow_args implements org.apache.thrift.TBase<canCommitOrThrow_args, canCommitOrThrow_args._Fields>, java.io.Serializable, Cloneable   {
+    private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("canCommitOrThrow_args");
 
     private static final org.apache.thrift.protocol.TField TX_FIELD_DESC = new org.apache.thrift.protocol.TField("tx", org.apache.thrift.protocol.TType.STRUCT, (short)1);
+    private static final org.apache.thrift.protocol.TField CHANGES_FIELD_DESC = new org.apache.thrift.protocol.TField("changes", org.apache.thrift.protocol.TType.SET, (short)2);
 
     private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
     static {
-      schemes.put(StandardScheme.class, new commitTx_argsStandardSchemeFactory());
-      schemes.put(TupleScheme.class, new commitTx_argsTupleSchemeFactory());
+      schemes.put(StandardScheme.class, new canCommitOrThrow_argsStandardSchemeFactory());
+      schemes.put(TupleScheme.class, new canCommitOrThrow_argsTupleSchemeFactory());
     }
 
     public TTransaction tx; // required
+    public Set<ByteBuffer> changes; // required
 
     /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
     public enum _Fields implements org.apache.thrift.TFieldIdEnum {
-      TX((short)1, "tx");
+      TX((short)1, "tx"),
+      CHANGES((short)2, "changes");
 
       private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
 
@@ -7953,6 +8052,8 @@ public class TTransactionServer {
         switch(fieldId) {
           case 1: // TX
             return TX;
+          case 2: // CHANGES
+            return CHANGES;
           default:
             return null;
         }
@@ -7998,43 +8099,58 @@ public class TTransactionServer {
       Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
       tmpMap.put(_Fields.TX, new org.apache.thrift.meta_data.FieldMetaData("tx", org.apache.thrift.TFieldRequirementType.DEFAULT, 
           new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, TTransaction.class)));
+      tmpMap.put(_Fields.CHANGES, new org.apache.thrift.meta_data.FieldMetaData("changes", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+          new org.apache.thrift.meta_data.SetMetaData(org.apache.thrift.protocol.TType.SET, 
+              new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING              , true))));
       metaDataMap = Collections.unmodifiableMap(tmpMap);
-      org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(commitTx_args.class, metaDataMap);
+      org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(canCommitOrThrow_args.class, metaDataMap);
     }
 
-    public commitTx_args() {
+    public canCommitOrThrow_args() {
     }
 
-    public commitTx_args(
-      TTransaction tx)
+    public canCommitOrThrow_args(
+      TTransaction tx,
+      Set<ByteBuffer> changes)
     {
       this();
       this.tx = tx;
+      this.changes = changes;
     }
 
     /**
      * Performs a deep copy on <i>other</i>.
      */
-    public commitTx_args(commitTx_args other) {
+    public canCommitOrThrow_args(canCommitOrThrow_args other) {
       if (other.isSetTx()) {
         this.tx = new TTransaction(other.tx);
       }
+      if (other.isSetChanges()) {
+        Set<ByteBuffer> __this__changes = new HashSet<ByteBuffer>();
+        for (ByteBuffer other_element : other.changes) {
+          ByteBuffer temp_binary_element = org.apache.thrift.TBaseHelper.copyBinary(other_element);
+;
+          __this__changes.add(temp_binary_element);
+        }
+        this.changes = __this__changes;
+      }
     }
 
-    public commitTx_args deepCopy() {
-      return new commitTx_args(this);
+    public canCommitOrThrow_args deepCopy() {
+      return new canCommitOrThrow_args(this);
     }
 
     @Override
     public void clear() {
       this.tx = null;
+      this.changes = null;
     }
 
     public TTransaction getTx() {
       return this.tx;
     }
 
-    public commitTx_args setTx(TTransaction tx) {
+    public canCommitOrThrow_args setTx(TTransaction tx) {
       this.tx = tx;
       return this;
     }
@@ -8054,6 +8170,45 @@ public class TTransactionServer {
       }
     }
 
+    public int getChangesSize() {
+      return (this.changes == null) ? 0 : this.changes.size();
+    }
+
+    public java.util.Iterator<ByteBuffer> getChangesIterator() {
+      return (this.changes == null) ? null : this.changes.iterator();
+    }
+
+    public void addToChanges(ByteBuffer elem) {
+      if (this.changes == null) {
+        this.changes = new HashSet<ByteBuffer>();
+      }
+      this.changes.add(elem);
+    }
+
+    public Set<ByteBuffer> getChanges() {
+      return this.changes;
+    }
+
+    public canCommitOrThrow_args setChanges(Set<ByteBuffer> changes) {
+      this.changes = changes;
+      return this;
+    }
+
+    public void unsetChanges() {
+      this.changes = null;
+    }
+
+    /** Returns true if field changes is set (has been assigned a value) and false otherwise */
+    public boolean isSetChanges() {
+      return this.changes != null;
+    }
+
+    public void setChangesIsSet(boolean value) {
+      if (!value) {
+        this.changes = null;
+      }
+    }
+
     public void setFieldValue(_Fields field, Object value) {
       switch (field) {
       case TX:
@@ -8064,6 +8219,14 @@ public class TTransactionServer {
         }
         break;
 
+      case CHANGES:
+        if (value == null) {
+          unsetChanges();
+        } else {
+          setChanges((Set<ByteBuffer>)value);
+        }
+        break;
+
       }
     }
 
@@ -8072,6 +8235,9 @@ public class TTransactionServer {
       case TX:
         return getTx();
 
+      case CHANGES:
+        return getChanges();
+
       }
       throw new IllegalStateException();
     }
@@ -8085,6 +8251,8 @@ public class TTransactionServer {
       switch (field) {
       case TX:
         return isSetTx();
+      case CHANGES:
+        return isSetChanges();
       }
       throw new IllegalStateException();
     }
@@ -8093,12 +8261,12 @@ public class TTransactionServer {
     public boolean equals(Object that) {
       if (that == null)
         return false;
-      if (that instanceof commitTx_args)
-        return this.equals((commitTx_args)that);
+      if (that instanceof canCommitOrThrow_args)
+        return this.equals((canCommitOrThrow_args)that);
       return false;
     }
 
-    public boolean equals(commitTx_args that) {
+    public boolean equals(canCommitOrThrow_args that) {
       if (that == null)
         return false;
 
@@ -8111,6 +8279,15 @@ public class TTransactionServer {
           return false;
       }
 
+      boolean this_present_changes = true && this.isSetChanges();
+      boolean that_present_changes = true && that.isSetChanges();
+      if (this_present_changes || that_present_changes) {
+        if (!(this_present_changes && that_present_changes))
+          return false;
+        if (!this.changes.equals(that.changes))
+          return false;
+      }
+
       return true;
     }
 
@@ -8119,13 +8296,13 @@ public class TTransactionServer {
       return 0;
     }
 
-    public int compareTo(commitTx_args other) {
+    public int compareTo(canCommitOrThrow_args other) {
       if (!getClass().equals(other.getClass())) {
         return getClass().getName().compareTo(other.getClass().getName());
       }
 
       int lastComparison = 0;
-      commitTx_args typedOther = (commitTx_args)other;
+      canCommitOrThrow_args typedOther = (canCommitOrThrow_args)other;
 
       lastComparison = Boolean.valueOf(isSetTx()).compareTo(typedOther.isSetTx());
       if (lastComparison != 0) {
@@ -8137,6 +8314,16 @@ public class TTransactionServer {
           return lastComparison;
         }
       }
+      lastComparison = Boolean.valueOf(isSetChanges()).compareTo(typedOther.isSetChanges());
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+      if (isSetChanges()) {
+        lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.changes, typedOther.changes);
+        if (lastComparison != 0) {
+          return lastComparison;
+        }
+      }
       return 0;
     }
 
@@ -8154,7 +8341,7 @@ public class TTransactionServer {
 
     @Override
     public String toString() {
-      StringBuilder sb = new StringBuilder("commitTx_args(");
+      StringBuilder sb = new StringBuilder("canCommitOrThrow_args(");
       boolean first = true;
 
       sb.append("tx:");
@@ -8164,6 +8351,14 @@ public class TTransactionServer {
         sb.append(this.tx);
       }
       first = false;
+      if (!first) sb.append(", ");
+      sb.append("changes:");
+      if (this.changes == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.changes);
+      }
+      first = false;
       sb.append(")");
       return sb.toString();
     }
@@ -8192,15 +8387,15 @@ public class TTransactionServer {
       }
     }
 
-    private static class commitTx_argsStandardSchemeFactory implements SchemeFactory {
-      public commitTx_argsStandardScheme getScheme() {
-        return new commitTx_argsStandardScheme();
+    private static class canCommitOrThrow_argsStandardSchemeFactory implements SchemeFactory {
+      public canCommitOrThrow_argsStandardScheme getScheme() {
+        return new canCommitOrThrow_argsStandardScheme();
       }
     }
 
-    private static class commitTx_argsStandardScheme extends StandardScheme<commitTx_args> {
+    private static class canCommitOrThrow_argsStandardScheme extends StandardScheme<canCommitOrThrow_args> {
 
-      public void read(org.apache.thrift.protocol.TProtocol iprot, commitTx_args struct) throws org.apache.thrift.TException {
+      public void read(org.apache.thrift.protocol.TProtocol iprot, canCommitOrThrow_args struct) throws org.apache.thrift.TException {
         org.apache.thrift.protocol.TField schemeField;
         iprot.readStructBegin();
         while (true)
@@ -8219,6 +8414,24 @@ public class TTransactionServer {
                 org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
               }
               break;
+            case 2: // CHANGES
+              if (schemeField.type == org.apache.thrift.protocol.TType.SET) {
+                {
+                  org.apache.thrift.protocol.TSet _set32 = iprot.readSetBegin();
+                  struct.changes = new HashSet<ByteBuffer>(2*_set32.size);
+                  for (int _i33 = 0; _i33 < _set32.size; ++_i33)
+                  {
+                    ByteBuffer _elem34; // required
+                    _elem34 = iprot.readBinary();
+                    struct.changes.add(_elem34);
+                  }
+                  iprot.readSetEnd();
+                }
+                struct.setChangesIsSet(true);
+              } else { 
+                org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+              }
+              break;
             default:
               org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
           }
@@ -8230,7 +8443,7 @@ public class TTransactionServer {
         struct.validate();
       }
 
-      public void write(org.apache.thrift.protocol.TProtocol oprot, commitTx_args struct) throws org.apache.thrift.TException {
+      public void write(org.apache.thrift.protocol.TProtocol oprot, canCommitOrThrow_args struct) throws org.apache.thrift.TException {
         struct.validate();
 
         oprot.writeStructBegin(STRUCT_DESC);
@@ -8239,66 +8452,106 @@ public class TTransactionServer {
           struct.tx.write(oprot);
           oprot.writeFieldEnd();
         }
+        if (struct.changes != null) {
+          oprot.writeFieldBegin(CHANGES_FIELD_DESC);
+          {
+            oprot.writeSetBegin(new org.apache.thrift.protocol.TSet(org.apache.thrift.protocol.TType.STRING, struct.changes.size()));
+            for (ByteBuffer _iter35 : struct.changes)
+            {
+              oprot.writeBinary(_iter35);
+            }
+            oprot.writeSetEnd();
+          }
+          oprot.writeFieldEnd();
+        }
         oprot.writeFieldStop();
         oprot.writeStructEnd();
       }
 
     }
 
-    private static class commitTx_argsTupleSchemeFactory implements SchemeFactory {
-      public commitTx_argsTupleScheme getScheme() {
-        return new commitTx_argsTupleScheme();
+    private static class canCommitOrThrow_argsTupleSchemeFactory implements SchemeFactory {
+      public canCommitOrThrow_argsTupleScheme getScheme() {
+        return new canCommitOrThrow_argsTupleScheme();
       }
     }
 
-    private static class commitTx_argsTupleScheme extends TupleScheme<commitTx_args> {
+    private static class canCommitOrThrow_argsTupleScheme extends TupleScheme<canCommitOrThrow_args> {
 
       @Override
-      public void write(org.apache.thrift.protocol.TProtocol prot, commitTx_args struct) throws org.apache.thrift.TException {
+      public void write(org.apache.thrift.protocol.TProtocol prot, canCommitOrThrow_args struct) throws org.apache.thrift.TException {
         TTupleProtocol oprot = (TTupleProtocol) prot;
         BitSet optionals = new BitSet();
         if (struct.isSetTx()) {
           optionals.set(0);
         }
-        oprot.writeBitSet(optionals, 1);
+        if (struct.isSetChanges()) {
+          optionals.set(1);
+        }
+        oprot.writeBitSet(optionals, 2);
         if (struct.isSetTx()) {
           struct.tx.write(oprot);
         }
+        if (struct.isSetChanges()) {
+          {
+            oprot.writeI32(struct.changes.size());
+            for (ByteBuffer _iter36 : struct.changes)
+            {
+              oprot.writeBinary(_iter36);
+            }
+          }
+        }
       }
 
       @Override
-      public void read(org.apache.thrift.protocol.TProtocol prot, commitTx_args struct) throws org.apache.thrift.TException {
+      public void read(org.apache.thrift.protocol.TProtocol prot, canCommitOrThrow_args struct) throws org.apache.thrift.TException {
         TTupleProtocol iprot = (TTupleProtocol) prot;
-        BitSet incoming = iprot.readBitSet(1);
+        BitSet incoming = iprot.readBitSet(2);
         if (incoming.get(0)) {
           struct.tx = new TTransaction();
           struct.tx.read(iprot);
           struct.setTxIsSet(true);
         }
+        if (incoming.get(1)) {
+          {
+            org.apache.thrift.protocol.TSet _set37 = new org.apache.thrift.protocol.TSet(org.apache.thrift.protocol.TType.STRING, iprot.readI32());
+            struct.changes = new HashSet<ByteBuffer>(2*_set37.size);
+            for (int _i38 = 0; _i38 < _set37.size; ++_i38)
+            {
+              ByteBuffer _elem39; // required
+              _elem39 = iprot.readBinary();
+              struct.changes.add(_elem39);
+            }
+          }
+          struct.setChangesIsSet(true);
+        }
       }
     }
 
   }
 
-  public static class commitTx_result implements org.apache.thrift.TBase<commitTx_result, commitTx_result._Fields>, java.io.Serializable, Cloneable   {
-    private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("commitTx_result");
+  public static class canCommitOrThrow_result implements org.apache.thrift.TBase<canCommitOrThrow_result, canCommitOrThrow_result._Fields>, java.io.Serializable, Cloneable   {
+    private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("canCommitOrThrow_result");
 
     private static final org.apache.thrift.protocol.TField SUCCESS_FIELD_DESC = new org.apache.thrift.protocol.TField("success", org.apache.thrift.protocol.TType.STRUCT, (short)0);
     private static final org.apache.thrift.protocol.TField E_FIELD_DESC = new org.apache.thrift.protocol.TField("e", org.apache.thrift.protocol.TType.STRUCT, (short)1);
+    private static final org.apache.thrift.protocol.TField G_FIELD_DESC = new org.apache.thrift.protocol.TField("g", org.apache.thrift.protocol.TType.STRUCT, (short)2);
 
     private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
     static {
-      schemes.put(StandardScheme.class, new commitTx_resultStandardSchemeFactory());
-      schemes.put(TupleScheme.class, new commitTx_resultTupleSchemeFactory());
+      schemes.put(StandardScheme.class, new canCommitOrThrow_resultStandardSchemeFactory());
+      schemes.put(TupleScheme.class, new canCommitOrThrow_resultTupleSchemeFactory());
     }
 
     public TBoolean success; // required
     public TTransactionNotInProgressException e; // required
+    public TGenericException g; // required
 
     /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
     public enum _Fields implements org.apache.thrift.TFieldIdEnum {
       SUCCESS((short)0, "success"),
-      E((short)1, "e");
+      E((short)1, "e"),
+      G((short)2, "g");
 
       private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
 
@@ -8317,6 +8570,8 @@ public class TTransactionServer {
             return SUCCESS;
           case 1: // E
             return E;
+          case 2: // G
+            return G;
           default:
             return null;
         }
@@ -8364,49 +8619,57 @@ public class TTransactionServer {
           new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, TBoolean.class)));
       tmpMap.put(_Fields.E, new org.apache.thrift.meta_data.FieldMetaData("e", org.apache.thrift.TFieldRequirementType.DEFAULT, 
           new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRUCT)));
+      tmpMap.put(_Fields.G, new org.apache.thrift.meta_data.FieldMetaData("g", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+          new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRUCT)));
       metaDataMap = Collections.unmodifiableMap(tmpMap);
-      org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(commitTx_result.class, metaDataMap);
+      org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(canCommitOrThrow_result.class, metaDataMap);
     }
 
-    public commitTx_result() {
+    public canCommitOrThrow_result() {
     }
 
-    public commitTx_result(
+    public canCommitOrThrow_result(
       TBoolean success,
-      TTransactionNotInProgressException e)
+      TTransactionNotInProgressException e,
+      TGenericException g)
     {
       this();
       this.success = success;
       this.e = e;
+      this.g = g;
     }
 
     /**
      * Performs a deep copy on <i>other</i>.
      */
-    public commitTx_result(commitTx_result other) {
+    public canCommitOrThrow_result(canCommitOrThrow_result other) {
       if (other.isSetSuccess()) {
         this.success = new TBoolean(other.success);
       }
       if (other.isSetE()) {
         this.e = new TTransactionNotInProgressException(other.e);
       }
+      if (other.isSetG()) {
+        this.g = new TGenericException(other.g);
+      }
     }
 
-    public commitTx_result deepCopy() {
-      return new commitTx_result(this);
+    public canCommitOrThrow_result deepCopy() {
+      return new canCommitOrThrow_result(this);
     }
 
     @Override
     public void clear() {
       this.success = null;
       this.e = null;
+      this.g = null;
     }
 
     public TBoolean getSuccess() {
       return this.success;
     }
 
-    public commitTx_result setSuccess(TBoolean success) {
+    public canCommitOrThrow_result setSuccess(TBoolean success) {
       this.success = success;
       return this;
     }
@@ -8430,7 +8693,916 @@ public class TTransactionServer {
       return this.e;
     }
 
-    public commitTx_result setE(TTransactionNotInProgressException e) {
+    public canCommitOrThrow_result setE(TTransactionNotInProgressException e) {
+      this.e = e;
+      return this;
+    }
+
+    public void unsetE() {
+      this.e = null;
+    }
+
+    /** Returns true if field e is set (has been assigned a value) and false otherwise */
+    public boolean isSetE() {
+      return this.e != null;
+    }
+
+    public void setEIsSet(boolean value) {
+      if (!value) {
+        this.e = null;
+      }
+    }
+
+    public TGenericException getG() {
+      return this.g;
+    }
+
+    public canCommitOrThrow_result setG(TGenericException g) {
+      this.g = g;
+      return this;
+    }
+
+    public void unsetG() {
+      this.g = null;
+    }
+
+    /** Returns true if field g is set (has been assigned a value) and false otherwise */
+    public boolean isSetG() {
+      return this.g != null;
+    }
+
+    public void setGIsSet(boolean value) {
+      if (!value) {
+        this.g = null;
+      }
+    }
+
+    public void setFieldValue(_Fields field, Object value) {
+      switch (field) {
+      case SUCCESS:
+        if (value == null) {
+          unsetSuccess();
+        } else {
+          setSuccess((TBoolean)value);
+        }
+        break;
+
+      case E:
+        if (value == null) {
+          unsetE();
+        } else {
+          setE((TTransactionNotInProgressException)value);
+        }
+        break;
+
+      case G:
+        if (value == null) {
+          unsetG();
+        } else {
+          setG((TGenericException)value);
+        }
+        break;
+
+      }
+    }
+
+    public Object getFieldValue(_Fields field) {
+      switch (field) {
+      case SUCCESS:
+        return getSuccess();
+
+      case E:
+        return getE();
+
+      case G:
+        return getG();
+
+      }
+      throw new IllegalStateException();
+    }
+
+    /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+    public boolean isSet(_Fields field) {
+      if (field == null) {
+        throw new IllegalArgumentException();
+      }
+
+      switch (field) {
+      case SUCCESS:
+        return isSetSuccess();
+      case E:
+        return isSetE();
+      case G:
+        return isSetG();
+      }
+      throw new IllegalStateException();
+    }
+
+    @Override
+    public boolean equals(Object that) {
+      if (that == null)
+        return false;
+      if (that instanceof canCommitOrThrow_result)
+        return this.equals((canCommitOrThrow_result)that);
+      return false;
+    }
+
+    public boolean equals(canCommitOrThrow_result that) {
+      if (that == null)
+        return false;
+
+      boolean this_present_success = true && this.isSetSuccess();
+      boolean that_present_success = true && that.isSetSuccess();
+      if (this_present_success || that_present_success) {
+        if (!(this_present_success && that_present_success))
+          return false;
+        if (!this.success.equals(that.success))
+          return false;
+      }
+
+      boolean this_present_e = true && this.isSetE();
+      boolean that_present_e = true && that.isSetE();
+      if (this_present_e || that_present_e) {
+        if (!(this_present_e && that_present_e))
+          return false;
+        if (!this.e.equals(that.e))
+          return false;
+      }
+
+      boolean this_present_g = true && this.isSetG();
+      boolean that_present_g = true && that.isSetG();
+      if (this_present_g || that_present_g) {
+        if (!(this_present_g && that_present_g))
+          return false;
+        if (!this.g.equals(that.g))
+          return false;
+      }
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      return 0;
+    }
+
+    public int compareTo(canCommitOrThrow_result other) {
+      if (!getClass().equals(other.getClass())) {
+        return getClass().getName().compareTo(other.getClass().getName());
+      }
+
+      int lastComparison = 0;
+      canCommitOrThrow_result typedOther = (canCommitOrThrow_result)other;
+
+      lastComparison = Boolean.valueOf(isSetSuccess()).compareTo(typedOther.isSetSuccess());
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+      if (isSetSuccess()) {
+        lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.success, typedOther.success);
+        if (lastComparison != 0) {
+          return lastComparison;
+        }
+      }
+      lastComparison = Boolean.valueOf(isSetE()).compareTo(typedOther.isSetE());
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+      if (isSetE()) {
+        lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.e, typedOther.e);
+        if (lastComparison != 0) {
+          return lastComparison;
+        }
+      }
+      lastComparison = Boolean.valueOf(isSetG()).compareTo(typedOther.isSetG());
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+      if (isSetG()) {
+        lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.g, typedOther.g);
+        if (lastComparison != 0) {
+          return lastComparison;
+        }
+      }
+      return 0;
+    }
+
+    public _Fields fieldForId(int fieldId) {
+      return _Fields.findByThriftId(fieldId);
+    }
+
+    public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+      schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+    }
+
+    public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+      schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+      }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder("canCommitOrThrow_result(");
+      boolean first = true;
+
+      sb.append("success:");
+      if (this.success == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.success);
+      }
+      first = false;
+      if (!first) sb.append(", ");
+      sb.append("e:");
+      if (this.e == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.e);
+      }
+      first = false;
+      if (!first) sb.append(", ");
+      sb.append("g:");
+      if (this.g == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.g);
+      }
+      first = false;
+      sb.append(")");
+      return sb.toString();
+    }
+
+    public void validate() throws org.apache.thrift.TException {
+      // check for required fields
+      // check for sub-struct validity
+      if (success != null) {
+        success.validate();
+      }
+    }
+
+    private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+      try {
+        write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+      try {
+        read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private static class canCommitOrThrow_resultStandardSchemeFactory implements SchemeFactory {
+      public canCommitOrThrow_resultStandardScheme getScheme() {
+        return new canCommitOrThrow_resultStandardScheme();
+      }
+    }
+
+    private static class canCommitOrThrow_resultStandardScheme extends StandardScheme<canCommitOrThrow_result> {
+
+      public void read(org.apache.thrift.protocol.TProtocol iprot, canCommitOrThrow_result struct) throws org.apache.thrift.TException {
+        org.apache.thrift.protocol.TField schemeField;
+        iprot.readStructBegin();
+        while (true)
+        {
+          schemeField = iprot.readFieldBegin();
+          if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
+            break;
+          }
+          switch (schemeField.id) {
+            case 0: // SUCCESS
+              if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) {
+                struct.success = new TBoolean();
+                struct.success.read(iprot);
+                struct.setSuccessIsSet(true);
+              } else { 
+                org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+              }
+              break;
+            case 1: // E
+              if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) {
+                struct.e = new TTransactionNotInProgressException();
+                struct.e.read(iprot);
+                struct.setEIsSet(true);
+              } else { 
+                org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+              }
+              break;
+            case 2: // G
+              if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) {
+                struct.g = new TGenericException();
+                struct.g.read(iprot);
+                struct.setGIsSet(true);
+              } else { 
+                org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+              }
+              break;
+            default:
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+          }
+          iprot.readFieldEnd();
+        }
+        iprot.readStructEnd();
+
+        // check for required fields of primitive type, which can't be checked in the validate method
+        struct.validate();
+      }
+
+      public void write(org.apache.thrift.protocol.TProtocol oprot, canCommitOrThrow_result struct) throws org.apache.thrift.TException {
+        struct.validate();
+
+        oprot.writeStructBegin(STRUCT_DESC);
+        if (struct.success != null) {
+          oprot.writeFieldBegin(SUCCESS_FIELD_DESC);
+          struct.success.write(oprot);
+          oprot.writeFieldEnd();
+        }
+        if (struct.e != null) {
+          oprot.writeFieldBegin(E_FIELD_DESC);
+          struct.e.write(oprot);
+          oprot.writeFieldEnd();
+        }
+        if (struct.g != null) {
+          oprot.writeFieldBegin(G_FIELD_DESC);
+          struct.g.write(oprot);
+          oprot.writeFieldEnd();
+        }
+        oprot.writeFieldStop();
+        oprot.writeStructEnd();
+      }
+
+    }
+
+    private static class canCommitOrThrow_resultTupleSchemeFactory implements SchemeFactory {
+      public canCommitOrThrow_resultTupleScheme getScheme() {
+        return new canCommitOrThrow_resultTupleScheme();
+      }
+    }
+
+    private static class canCommitOrThrow_resultTupleScheme extends TupleScheme<canCommitOrThrow_result> {
+
+      @Override
+      public void write(org.apache.thrift.protocol.TProtocol prot, canCommitOrThrow_result struct) throws org.apache.thrift.TException {
+        TTupleProtocol oprot = (TTupleProtocol) prot;
+        BitSet optionals = new BitSet();
+        if (struct.isSetSuccess()) {
+          optionals.set(0);
+        }
+        if (struct.isSetE()) {
+          optionals.set(1);
+        }
+        if (struct.isSetG()) {
+          optionals.set(2);
+        }
+        oprot.writeBitSet(optionals, 3);
+        if (struct.isSetSuccess()) {
+          struct.success.write(oprot);
+        }
+        if (struct.isSetE()) {
+          struct.e.write(oprot);
+        }
+        if (struct.isSetG()) {
+          struct.g.write(oprot);
+        }
+      }
+
+      @Override
+      public void read(org.apache.thrift.protocol.TProtocol prot, canCommitOrThrow_result struct) throws org.apache.thrift.TException {
+        TTupleProtocol iprot = (TTupleProtocol) prot;
+        BitSet incoming = iprot.readBitSet(3);
+        if (incoming.get(0)) {
+          struct.success = new TBoolean();
+          struct.success.read(iprot);
+          struct.setSuccessIsSet(true);
+        }
+        if (incoming.get(1)) {
+          struct.e = new TTransactionNotInProgressException();
+          struct.e.read(iprot);
+          struct.setEIsSet(true);
+        }
+        if (incoming.get(2)) {
+          struct.g = new TGenericException();
+          struct.g.read(iprot);
+          struct.setGIsSet(true);
+        }
+      }
+    }
+
+  }
+
+  public static class commitTx_args implements org.apache.thrift.TBase<commitTx_args, commitTx_args._Fields>, java.io.Serializable, Cloneable   {
+    private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("commitTx_args");
+
+    private static final org.apache.thrift.protocol.TField TX_FIELD_DESC = new org.apache.thrift.protocol.TField("tx", org.apache.thrift.protocol.TType.STRUCT, (short)1);
+
+    private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+    static {
+      schemes.put(StandardScheme.class, new commitTx_argsStandardSchemeFactory());
+      schemes.put(TupleScheme.class, new commitTx_argsTupleSchemeFactory());
+    }
+
+    public TTransaction tx; // required
+
+    /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+    public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+      TX((short)1, "tx");
+
+      private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+      static {
+        for (_Fields field : EnumSet.allOf(_Fields.class)) {
+          byName.put(field.getFieldName(), field);
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, or null if its not found.
+       */
+      public static _Fields findByThriftId(int fieldId) {
+        switch(fieldId) {
+          case 1: // TX
+            return TX;
+          default:
+            return null;
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, throwing an exception
+       * if it is not found.
+       */
+      public static _Fields findByThriftIdOrThrow(int fieldId) {
+        _Fields fields = findByThriftId(fieldId);
+        if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+        return fields;
+      }
+
+      /**
+       * Find the _Fields constant that matches name, or null if its not found.
+       */
+      public static _Fields findByName(String name) {
+        return byName.get(name);
+      }
+
+      private final short _thriftId;
+      private final String _fieldName;
+
+      _Fields(short thriftId, String fieldName) {
+        _thriftId = thriftId;
+        _fieldName = fieldName;
+      }
+
+      public short getThriftFieldId() {
+        return _thriftId;
+      }
+
+      public String getFieldName() {
+        return _fieldName;
+      }
+    }
+
+    // isset id assignments
+    public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+    static {
+      Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+      tmpMap.put(_Fields.TX, new org.apache.thrift.meta_data.FieldMetaData("tx", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+          new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, TTransaction.class)));
+      metaDataMap = Collections.unmodifiableMap(tmpMap);
+      org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(commitTx_args.class, metaDataMap);
+    }
+
+    public commitTx_args() {
+    }
+
+    public commitTx_args(
+      TTransaction tx)
+    {
+      this();
+      this.tx = tx;
+    }
+
+    /**
+     * Performs a deep copy on <i>other</i>.
+     */
+    public commitTx_args(commitTx_args other) {
+      if (other.isSetTx()) {
+        this.tx = new TTransaction(other.tx);
+      }
+    }
+
+    public commitTx_args deepCopy() {
+      return new commitTx_args(this);
+    }
+
+    @Override
+    public void clear() {
+      this.tx = null;
+    }
+
+    public TTransaction getTx() {
+      return this.tx;
+    }
+
+    public commitTx_args setTx(TTransaction tx) {
+      this.tx = tx;
+      return this;
+    }
+
+    public void unsetTx() {
+      this.tx = null;
+    }
+
+    /** Returns true if field tx is set (has been assigned a value) and false otherwise */
+    public boolean isSetTx() {
+      return this.tx != null;
+    }
+
+    public void setTxIsSet(boolean value) {
+      if (!value) {
+        this.tx = null;
+      }
+    }
+
+    public void setFieldValue(_Fields field, Object value) {
+      switch (field) {
+      case TX:
+        if (value == null) {
+          unsetTx();
+        } else {
+          setTx((TTransaction)value);
+        }
+        break;
+
+      }
+    }
+
+    public Object getFieldValue(_Fields field) {
+      switch (field) {
+      case TX:
+        return getTx();
+
+      }
+      throw new IllegalStateException();
+    }
+
+    /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+    public boolean isSet(_Fields field) {
+      if (field == null) {
+        throw new IllegalArgumentException();
+      }
+
+      switch (field) {
+      case TX:
+        return isSetTx();
+      }
+      throw new IllegalStateException();
+    }
+
+    @Override
+    public boolean equals(Object that) {
+      if (that == null)
+        return false;
+      if (that instanceof commitTx_args)
+        return this.equals((commitTx_args)that);
+      return false;
+    }
+
+    public boolean equals(commitTx_args that) {
+      if (that == null)
+        return false;
+
+      boolean this_present_tx = true && this.isSetTx();
+      boolean that_present_tx = true && that.isSetTx();
+      if (this_present_tx || that_present_tx) {
+        if (!(this_present_tx && that_present_tx))
+          return false;
+        if (!this.tx.equals(that.tx))
+          return false;
+      }
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      return 0;
+    }
+
+    public int compareTo(commitTx_args other) {
+      if (!getClass().equals(other.getClass())) {
+        return getClass().getName().compareTo(other.getClass().getName());
+      }
+
+      int lastComparison = 0;
+      commitTx_args typedOther = (commitTx_args)other;
+
+      lastComparison = Boolean.valueOf(isSetTx()).compareTo(typedOther.isSetTx());
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+      if (isSetTx()) {
+        lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.tx, typedOther.tx);
+        if (lastComparison != 0) {
+          return lastComparison;
+        }
+      }
+      return 0;
+    }
+
+    public _Fields fieldForId(int fieldId) {
+      return _Fields.findByThriftId(fieldId);
+    }
+
+    public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+      schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+    }
+
+    public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+      schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder("commitTx_args(");
+      boolean first = true;
+
+      sb.append("tx:");
+      if (this.tx == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.tx);
+      }
+      first = false;
+      sb.append(")");
+      return sb.toString();
+    }
+
+    public void validate() throws org.apache.thrift.TException {
+      // check for required fields
+      // check for sub-struct validity
+      if (tx != null) {
+        tx.validate();
+      }
+    }
+
+    private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+      try {
+        write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+      try {
+        read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private static class commitTx_argsStandardSchemeFactory implements SchemeFactory {
+      public commitTx_argsStandardScheme getScheme() {
+        return new commitTx_argsStandardScheme();
+      }
+    }
+
+    private static class commitTx_argsStandardScheme extends StandardScheme<commitTx_args> {
+
+      public void read(org.apache.thrift.protocol.TProtocol iprot, commitTx_args struct) throws org.apache.thrift.TException {
+        org.apache.thrift.protocol.TField schemeField;
+        iprot.readStructBegin();
+        while (true)
+        {
+          schemeField = iprot.readFieldBegin();
+          if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
+            break;
+          }
+          switch (schemeField.id) {
+            case 1: // TX
+              if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) {
+                struct.tx = new TTransaction();
+                struct.tx.read(iprot);
+                struct.setTxIsSet(true);
+              } else { 
+                org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+              }
+              break;
+            default:
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+          }
+          iprot.readFieldEnd();
+        }
+        iprot.readStructEnd();
+
+        // check for required fields of primitive type, which can't be checked in the validate method
+        struct.validate();
+      }
+
+      public void write(org.apache.thrift.protocol.TProtocol oprot, commitTx_args struct) throws org.apache.thrift.TException {
+        struct.validate();
+
+        oprot.writeStructBegin(STRUCT_DESC);
+        if (struct.tx != null) {
+          oprot.writeFieldBegin(TX_FIELD_DESC);
+          struct.tx.write(oprot);
+          oprot.writeFieldEnd();
+        }
+        oprot.writeFieldStop();
+        oprot.writeStructEnd();
+      }
+
+    }
+
+    private static class commitTx_argsTupleSchemeFactory implements SchemeFactory {
+      public commitTx_argsTupleScheme getScheme() {
+        return new commitTx_argsTupleScheme();
+      }
+    }
+
+    private static class commitTx_argsTupleScheme extends TupleScheme<commitTx_args> {
+
+      @Override
+      public void write(org.apache.thrift.protocol.TProtocol prot, commitTx_args struct) throws org.apache.thrift.TException {
+        TTupleProtocol oprot = (TTupleProtocol) prot;
+        BitSet optionals = new BitSet();
+        if (struct.isSetTx()) {
+          optionals.set(0);
+        }
+        oprot.writeBitSet(optionals, 1);
+        if (struct.isSetTx()) {
+          struct.tx.write(oprot);
+        }
+      }
+
+      @Override
+      public void read(org.apache.thrift.protocol.TProtocol prot, commitTx_args struct) throws org.apache.thrift.TException {
+        TTupleProtocol iprot = (TTupleProtocol) prot;
+        BitSet incoming = iprot.readBitSet(1);
+        if (incoming.get(0)) {
+          struct.tx = new TTransaction();
+          struct.tx.read(iprot);
+          struct.setTxIsSet(true);
+        }
+      }
+    }
+
+  }
+
+  public static class commitTx_result implements org.apache.thrift.TBase<commitTx_result, commitTx_result._Fields>, java.io.Serializable, Cloneable   {
+    private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("commitTx_result");
+
+    private static final org.apache.thrift.protocol.TField SUCCESS_FIELD_DESC = new org.apache.thrift.protocol.TField("success", org.apache.thrift.protocol.TType.STRUCT, (short)0);
+    private static final org.apache.thrift.protocol.TField E_FIELD_DESC = new org.apache.thrift.protocol.TField("e", org.apache.thrift.protocol.TType.STRUCT, (short)1);
+
+    private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+    static {
+      schemes.put(StandardScheme.class, new commitTx_resultStandardSchemeFactory());
+      schemes.put(TupleScheme.class, new commitTx_resultTupleSchemeFactory());
+    }
+
+    public TBoolean success; // required
+    public TTransactionNotInProgressException e; // required
+
+    /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+    public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+      SUCCESS((short)0, "success"),
+      E((short)1, "e");
+
+      private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+      static {
+        for (_Fields field : EnumSet.allOf(_Fields.class)) {
+          byName.put(field.getFieldName(), field);
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, or null if its not found.
+       */
+      public static _Fields findByThriftId(int fieldId) {
+        switch(fieldId) {
+          case 0: // SUCCESS
+            return SUCCESS;
+          case 1: // E
+            return E;
+          default:
+            return null;
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, throwing an exception
+       * if it is not found.
+       */
+      public static _Fields findByThriftIdOrThrow(int fieldId) {
+        _Fields fields = findByThriftId(fieldId);
+        if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+        return fields;
+      }
+
+      /**
+       * Find the _Fields constant that matches name, or null if its not found.
+       */
+      public static _Fields findByName(String name) {
+        return byName.get(name);
+      }
+
+      private final short _thriftId;
+      private final String _fieldName;
+
+      _Fields(short thriftId, String fieldName) {
+        _thriftId = thriftId;
+        _fieldName = fieldName;
+      }
+
+      public short getThriftFieldId() {
+        return _thriftId;
+      }
+
+      public String getFieldName() {
+        return _fieldName;
+      }
+    }
+
+    // isset id assignments
+    public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+    static {
+      Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+      tmpMap.put(_Fields.SUCCESS, new org.apache.thrift.meta_data.FieldMetaData("success", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+          new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, TBoolean.class)));
+      tmpMap.put(_Fields.E, new org.apache.thrift.meta_data.FieldMetaData("e", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+          new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRUCT)));
+      metaDataMap = Collections.unmodifiableMap(tmpMap);
+      org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(commitTx_result.class, metaDataMap);
+    }
+
+    public commitTx_result() {
+    }
+
+    public commitTx_result(
+      TBoolean success,
+      TTransactionNotInProgressException e)
+    {
+      this();
+      this.success = success;
+      this.e = e;
+    }
+
+    /**
+     * Performs a deep copy on <i>other</i>.
+     */
+    public commitTx_result(commitTx_result other) {
+      if (other.isSetSuccess()) {
+        this.success = new TBoolean(other.success);
+      }
+      if (other.isSetE()) {
+        this.e = new TTransactionNotInProgressException(other.e);
+      }
+    }
+
+    public commitTx_result deepCopy() {
+      return new commitTx_result(this);
+    }
+
+    @Override
+    public void clear() {
+      this.success = null;
+      this.e = null;
+    }
+
+    public TBoolean getSuccess() {
+      return this.success;
+    }
+
+    public commitTx_result setSuccess(TBoolean success) {
+      this.success = success;
+      return this;
+    }
+
+    public void unsetSuccess() {
+      this.success = null;
+    }
+
+    /** Returns true if field success is set (has been assigned a value) and false otherwise */
+    public boolean isSetSuccess() {
+      return this.success != null;
+    }
+
+    public void setSuccessIsSet(boolean value) {
+      if (!value) {
+        this.success = null;
+      }
+    }
+
+    public TTransactionNotInProgressException getE() {
+      return this.e;
+    }
+
+    public commitTx_result setE(TTransactionNotInProgressException e) {
       this.e = e;
       return this;
     }
@@ -12166,13 +13338,13 @@ public class TTransactionServer {
             case 1: // TXNS
               if (schemeField.type == org.apache.thrift.protocol.TType.SET) {
                 {
-                  org.apache.thrift.protocol.TSet _set32 = iprot.readSetBegin();
-                  struct.txns = new HashSet<Long>(2*_set32.size);
-                  for (int _i33 = 0; _i33 < _set32.size; ++_i33)
+                  org.apache.thrift.protocol.TSet _set40 = iprot.readSetBegin();
+                  struct.txns = new HashSet<Long>(2*_set40.size);
+                  for (int _i41 = 0; _i41 < _set40.size; ++_i41)
                   {
-                    long _elem34; // required
-                    _elem34 = iprot.readI64();
-                    struct.txns.add(_elem34);
+                    long _elem42; // required
+                    _elem42 = iprot.readI64();
+                    struct.txns.add(_elem42);
                   }
                   iprot.readSetEnd();
                 }
@@ -12200,9 +13372,9 @@ public class TTransactionServer {
           oprot.writeFieldBegin(TXNS_FIELD_DESC);
           {
             oprot.writeSetBegin(new org.apache.thrift.protocol.TSet(org.apache.thrift.protocol.TType.I64, struct.txns.size()));
-            for (long _iter35 : struct.txns)
+            for (long _iter43 : struct.txns)
             {
-              oprot.writeI64(_iter35);
+              oprot.writeI64(_iter43);
             }
             oprot.writeSetEnd();
           }
@@ -12233,9 +13405,9 @@ public class TTransactionServer {
         if (struct.isSetTxns()) {
           {
             oprot.writeI32(struct.txns.size());
-            for (long _iter36 : struct.txns)
+            for (long _iter44 : struct.txns)
             {
-              oprot.writeI64(_iter36);
+              oprot.writeI64(_iter44);
             }
           }
         }
@@ -12247,13 +13419,13 @@ public class TTransactionServer {
         BitSet incoming = iprot.readBitSet(1);
         if (incoming.get(0)) {
           {
-            org.apache.thrift.protocol.TSet _set37 = new org.apache.thrift.protocol.TSet(org.apache.thrift.protocol.TType.I64, iprot.readI32());
-            struct.txns = new HashSet<Long>(2*_set37.size);
-            for (int _i38 = 0; _i38 < _set37.size; ++_i38)
+            org.apache.thrift.protocol.TSet _set45 = new org.apache.thrift.protocol.TSet(org.apache.thrift.protocol.TType.I64, iprot.readI32());
+            struct.txns = new HashSet<Long>(2*_set45.size);
+            for (int _i46 = 0; _i46 < _set45.size; ++_i46)
             {
-              long _elem39; // required
-              _elem39 = iprot.readI64();
-              struct.txns.add(_elem39);
+              long _elem47; // required
+              _elem47 = iprot.readI64();
+              struct.txns.add(_elem47);
             }
           }
           struct.setTxnsIsSet(true);

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae6ce2b5/tephra-core/src/main/java/org/apache/tephra/inmemory/DetachedTxSystemClient.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/inmemory/DetachedTxSystemClient.java b/tephra-core/src/main/java/org/apache/tephra/inmemory/DetachedTxSystemClient.java
index 0a8ed96..dd17431 100644
--- a/tephra-core/src/main/java/org/apache/tephra/inmemory/DetachedTxSystemClient.java
+++ b/tephra-core/src/main/java/org/apache/tephra/inmemory/DetachedTxSystemClient.java
@@ -88,6 +88,11 @@ public class DetachedTxSystemClient implements TransactionSystemClient {
   }
 
   @Override
+  public boolean canCommitOrThrow(Transaction tx, Collection<byte[]> changeIds) {
+    return true;
+  }
+
+  @Override
   public boolean commit(Transaction tx) {
     return true;
   }