You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@omid.apache.org by ja...@apache.org on 2018/10/02 15:12:35 UTC

[1/2] incubator-omid git commit: OMID-113 Set conflict free option when constructing TTable

Repository: incubator-omid
Updated Branches:
  refs/heads/phoenix-integration d0f26489b -> cb5710702


OMID-113 Set conflict free option when constructing TTable


Project: http://git-wip-us.apache.org/repos/asf/incubator-omid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-omid/commit/cb571070
Tree: http://git-wip-us.apache.org/repos/asf/incubator-omid/tree/cb571070
Diff: http://git-wip-us.apache.org/repos/asf/incubator-omid/diff/cb571070

Branch: refs/heads/phoenix-integration
Commit: cb57107024df8414a5f059fd9681f7995005d4b2
Parents: 62acd24
Author: James Taylor <ja...@apache.org>
Authored: Mon Oct 1 08:02:10 2018 -0700
Committer: James Taylor <ja...@apache.org>
Committed: Tue Oct 2 08:11:54 2018 -0700

----------------------------------------------------------------------
 .../org/apache/omid/transaction/TTable.java     | 97 +++++++++++++-------
 .../omid/transaction/TestTTableBehaviour.java   |  4 +-
 .../transaction/TestTransactionConflict.java    | 63 ++++++-------
 .../org/apache/omid/transaction/CellUtils.java  |  1 -
 4 files changed, 96 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/cb571070/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java b/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java
index 6472b22..b7952a4 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java
@@ -75,49 +75,82 @@ public class TTable implements Closeable {
     
     private boolean autoFlush = true;
     
+    private final boolean conflictFree;
+    
     // ----------------------------------------------------------------------------------------------------------------
     // Construction
     // ----------------------------------------------------------------------------------------------------------------
 
     public TTable(Connection connection, byte[] tableName) throws IOException {
-        this(connection.getTable(TableName.valueOf(tableName)));
+        this(connection.getTable(TableName.valueOf(tableName)), false);
     }
 
     public TTable(Connection connection, byte[] tableName, CommitTable.Client commitTableClient) throws IOException {
-        this(connection.getTable(TableName.valueOf(tableName)), commitTableClient);
+        this(connection.getTable(TableName.valueOf(tableName)), commitTableClient, false);
     }
 
     public TTable(Connection connection, String tableName) throws IOException {
-        this(connection.getTable(TableName.valueOf(tableName)));
+        this(connection.getTable(TableName.valueOf(tableName)), false);
     }
 
     public TTable(Connection connection, String tableName, CommitTable.Client commitTableClient) throws IOException {
-        this(connection.getTable(TableName.valueOf(tableName)), commitTableClient);
+        this(connection.getTable(TableName.valueOf(tableName)), commitTableClient, false);
     }
 
     public TTable(Table hTable) throws IOException {
-        this(hTable, hTable.getConfiguration().getBoolean("omid.server.side.filter", false));
+        this(hTable, hTable.getConfiguration().getBoolean("omid.server.side.filter", false), false);
+    }
+
+    public TTable(Connection connection, byte[] tableName, boolean conflictFree) throws IOException {
+        this(connection.getTable(TableName.valueOf(tableName)), conflictFree);
+    }
+
+    public TTable(Connection connection, byte[] tableName, CommitTable.Client commitTableClient, boolean conflictFree) throws IOException {
+        this(connection.getTable(TableName.valueOf(tableName)), commitTableClient, conflictFree);
+    }
+
+    public TTable(Connection connection, String tableName, boolean conflictFree) throws IOException {
+        this(connection.getTable(TableName.valueOf(tableName)), conflictFree);
+    }
+
+    public TTable(Connection connection, String tableName, CommitTable.Client commitTableClient, boolean conflictFree) throws IOException {
+        this(connection.getTable(TableName.valueOf(tableName)), commitTableClient, conflictFree);
     }
 
-    public TTable(Table hTable, boolean serverSideFilter) throws IOException {
-        table = hTable;
-        mutations = new ArrayList<Mutation>();
+    public TTable(Table hTable, boolean conflictFree) throws IOException {
+        this(hTable, hTable.getConfiguration().getBoolean("omid.server.side.filter", false), conflictFree);
+    }
+
+    public TTable(Table hTable, SnapshotFilter snapshotFilter) throws IOException {
+        this(hTable, snapshotFilter, false);
+    }
+
+    public TTable(Table hTable, CommitTable.Client commitTableClient) throws IOException {
+        this(hTable, commitTableClient, false);
+    }
+
+    public TTable(Table hTable, boolean serverSideFilter, boolean conflictFree) throws IOException {
+        this.table = hTable;
+        this.conflictFree = conflictFree;
+        this.mutations = new ArrayList<Mutation>();
         this.serverSideFilter = serverSideFilter;
-        snapshotFilter = (serverSideFilter) ?  new AttributeSetSnapshotFilter(hTable) :
+        this.snapshotFilter = (serverSideFilter) ?  new AttributeSetSnapshotFilter(hTable) :
                 new SnapshotFilterImpl(new HTableAccessWrapper(hTable, hTable));
     }
 
-    public TTable(Table hTable, SnapshotFilter snapshotFilter ) throws IOException {
-        table = hTable;
-        mutations = new ArrayList<Mutation>();
+    public TTable(Table hTable, SnapshotFilter snapshotFilter, boolean conflictFree) throws IOException {
+        this.table = hTable;
+        this.conflictFree = conflictFree;
+        this.mutations = new ArrayList<Mutation>();
         this.snapshotFilter = snapshotFilter;
     }
 
-    public TTable(Table hTable, CommitTable.Client commitTableClient) throws IOException {
-        table = hTable;
-        mutations = new ArrayList<Mutation>();
-        serverSideFilter = table.getConfiguration().getBoolean("omid.server.side.filter", false);
-        snapshotFilter = (serverSideFilter) ?  new AttributeSetSnapshotFilter(hTable) :
+    public TTable(Table hTable, CommitTable.Client commitTableClient, boolean conflictFree) throws IOException {
+        this.table = hTable;
+        this.conflictFree = conflictFree;
+        this.mutations = new ArrayList<Mutation>();
+        this.serverSideFilter = table.getConfiguration().getBoolean("omid.server.side.filter", false);
+        this.snapshotFilter = (serverSideFilter) ?  new AttributeSetSnapshotFilter(hTable) :
                 new SnapshotFilterImpl(new HTableAccessWrapper(hTable, hTable), commitTableClient);
     }
 
@@ -196,12 +229,12 @@ public class TTable implements Closeable {
                 byte[] family = entryF.getKey();
                 for (Entry<byte[], NavigableMap<Long, byte[]>> entryQ : entryF.getValue().entrySet()) {
                     byte[] qualifier = entryQ.getKey();
-                    tx.addWriteSetElement(new HBaseCellId(this, deleteP.getRow(), family, qualifier,
+                    addWriteSetElement(tx, new HBaseCellId(this, deleteP.getRow(), family, qualifier,
                             tx.getWriteTimestamp()));
                 }
                 deleteP.addColumn(family, CellUtils.FAMILY_DELETE_QUALIFIER, tx.getWriteTimestamp(),
                         HConstants.EMPTY_BYTE_ARRAY);
-                tx.addWriteSetElement(new HBaseCellId(this, deleteP.getRow(), family, CellUtils.FAMILY_DELETE_QUALIFIER,
+                addWriteSetElement(tx, new HBaseCellId(this, deleteP.getRow(), family, CellUtils.FAMILY_DELETE_QUALIFIER,
                                                 tx.getWriteTimestamp()));
             }
         }
@@ -213,7 +246,7 @@ public class TTable implements Closeable {
         for (byte[] family : fset) {
             deleteP.addColumn(family, CellUtils.FAMILY_DELETE_QUALIFIER, tx.getWriteTimestamp(),
                     HConstants.EMPTY_BYTE_ARRAY);
-            tx.addWriteSetElement(new HBaseCellId(this, deleteP.getRow(), family, CellUtils.FAMILY_DELETE_QUALIFIER,
+            addWriteSetElement(tx, new HBaseCellId(this, deleteP.getRow(), family, CellUtils.FAMILY_DELETE_QUALIFIER,
                     tx.getWriteTimestamp()));
 
         }
@@ -260,7 +293,7 @@ public class TTable implements Closeable {
                                     CellUtil.cloneQualifier(cell),
                                     writeTimestamp,
                                     CellUtils.DELETE_TOMBSTONE);
-                        transaction.addWriteSetElement(
+                        addWriteSetElement(transaction,
                             new HBaseCellId(this,
                                             delete.getRow(),
                                             CellUtil.cloneFamily(cell),
@@ -277,7 +310,7 @@ public class TTable implements Closeable {
                                         CellUtil.cloneQualifier(cell),
                                         writeTimestamp,
                                         CellUtils.DELETE_TOMBSTONE);
-                            transaction.addWriteSetElement(
+                            addWriteSetElement(transaction,
                                 new HBaseCellId(this,
                                                 delete.getRow(),
                                                 CellUtil.cloneFamily(cell),
@@ -304,10 +337,6 @@ public class TTable implements Closeable {
         return deleteP;
     }
 
-    public void markPutAsConflictFreeMutation(Put put) {
-        put.setAttribute(CellUtils.CONFLICT_FREE_MUTATION, Bytes.toBytes(true));
-    }
-
     /**
      * Transactional version of {@link Table#put(Put put)}
      *
@@ -386,23 +415,27 @@ public class TTable implements Closeable {
                             kv.getTimestamp(),
                             Bytes.toBytes(kv.getTimestamp()));
                 } else {
-                    byte[] conflictFree = put.getAttribute(CellUtils.CONFLICT_FREE_MUTATION);
                     HBaseCellId cellId = new HBaseCellId(this,
                             CellUtil.cloneRow(kv),
                             CellUtil.cloneFamily(kv),
                             CellUtil.cloneQualifier(kv),
                             kv.getTimestamp());
 
-                    if (conflictFree != null && conflictFree[0]!=0) {
-                        transaction.addConflictFreeWriteSetElement(cellId);
-                    } else {
-                        transaction.addWriteSetElement(cellId);
-                    }
+                    addWriteSetElement(transaction, cellId);
                 }
             }
         }
         return tsput;
     }
+    
+    private void addWriteSetElement(HBaseTransaction transaction, HBaseCellId cellId) {
+        if (conflictFree) {
+            transaction.addConflictFreeWriteSetElement(cellId);
+        } else {
+            transaction.addWriteSetElement(cellId);
+        }
+        
+    }
 
     private void addMutation(Mutation m) throws IOException {
         this.mutations.add(m);

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/cb571070/hbase-client/src/test/java/org/apache/omid/transaction/TestTTableBehaviour.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestTTableBehaviour.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestTTableBehaviour.java
index c58f414..9c217b4 100644
--- a/hbase-client/src/test/java/org/apache/omid/transaction/TestTTableBehaviour.java
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestTTableBehaviour.java
@@ -42,7 +42,7 @@ public class TestTTableBehaviour {
     public void testUserOperationsDontAllowTimestampSpecification() throws Exception {
 
         // Component under test
-        TTable tt = new TTable(Mockito.mock(Table.class), false);
+        TTable tt = new TTable(Mockito.mock(Table.class), false, false);
 
         long randomTimestampValue = Bytes.toLong("deadbeef".getBytes());
 
@@ -117,7 +117,7 @@ public class TestTTableBehaviour {
         byte[] nonValidQualifier1 = "blahblah\u0080".getBytes(Charsets.UTF_8);
         byte[] validQualifierIncludingOldShadowCellSuffix = "blahblah:OMID_CTS".getBytes(Charsets.UTF_8);
 
-        TTable table = new TTable(Mockito.mock(Table.class), false);
+        TTable table = new TTable(Mockito.mock(Table.class), false, false);
 
         HBaseTransaction t1 = Mockito.mock(HBaseTransaction.class);
         Put put = new Put(row);

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/cb571070/hbase-client/src/test/java/org/apache/omid/transaction/TestTransactionConflict.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestTransactionConflict.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestTransactionConflict.java
index abdd602..a462d56 100644
--- a/hbase-client/src/test/java/org/apache/omid/transaction/TestTransactionConflict.java
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestTransactionConflict.java
@@ -307,7 +307,8 @@ public class TestTransactionConflict extends OmidTestBase {
     @Test(timeOut = 10_000)
     public void runTestWriteWriteConflictWithAdditionalConflictFreeWrites(ITestContext context) throws Exception {
         TransactionManager tm = newTransactionManager(context);
-        TTable tt = new TTable(connection, TEST_TABLE);
+        TTable tt1 = new TTable(connection, TEST_TABLE);
+        TTable tt2 = new TTable(connection, TEST_TABLE, true);
 
         Transaction t1 = tm.begin();
         LOG.info("Transaction created " + t1);
@@ -323,22 +324,20 @@ public class TestTransactionConflict extends OmidTestBase {
 
         Put p = new Put(row);
         p.addColumn(fam, col, data1);
-        tt.put(t1, p);
+        tt1.put(t1, p);
 
         Put p2 = new Put(row);
         p2.addColumn(fam, col, data2);
-        tt.put(t2, p2);
+        tt1.put(t2, p2);
 
         row = Bytes.toBytes("test-simple-cf");
         p = new Put(row);
         p.addColumn(fam, col, data1);
-        tt.markPutAsConflictFreeMutation(p);
-        tt.put(t1, p);
+        tt2.put(t1, p);
 
         p2 = new Put(row);
         p2.addColumn(fam, col, data2);
-        tt.markPutAsConflictFreeMutation(p2);
-        tt.put(t2, p2);
+        tt2.put(t2, p2);
 
         tm.commit(t2);
 
@@ -352,7 +351,8 @@ public class TestTransactionConflict extends OmidTestBase {
     @Test(timeOut = 10_000)
     public void runTestWriteWriteConflictFreeWrites(ITestContext context) throws Exception {
         TransactionManager tm = newTransactionManager(context);
-        TTable tt = new TTable(connection, TEST_TABLE);
+        TTable tt1 = new TTable(connection, TEST_TABLE);
+        TTable tt2 = new TTable(connection, TEST_TABLE, true);
 
         Transaction t1 = tm.begin();
         LOG.info("Transaction created " + t1);
@@ -368,24 +368,20 @@ public class TestTransactionConflict extends OmidTestBase {
 
         Put p = new Put(row);
         p.addColumn(fam, col, data1);
-        tt.markPutAsConflictFreeMutation(p);
-        tt.put(t1, p);
+        tt1.put(t1, p);
 
         Put p2 = new Put(row);
         p2.addColumn(fam, col, data2);
-        tt.markPutAsConflictFreeMutation(p2);
-        tt.put(t2, p2);
+        tt2.put(t2, p2);
 
         row = Bytes.toBytes("test-simple-cf");
         p = new Put(row);
         p.addColumn(fam, col, data1);
-        tt.markPutAsConflictFreeMutation(p);
-        tt.put(t1, p);
+        tt1.put(t1, p);
 
         p2 = new Put(row);
         p2.addColumn(fam, col, data2);
-        tt.markPutAsConflictFreeMutation(p2);
-        tt.put(t2, p2);
+        tt2.put(t2, p2);
 
         tm.commit(t2);
 
@@ -399,7 +395,8 @@ public class TestTransactionConflict extends OmidTestBase {
     @Test(timeOut = 10_000)
     public void runTestWriteWriteConflictFreeWritesWithOtherWrites(ITestContext context) throws Exception {
         TransactionManager tm = newTransactionManager(context);
-        TTable tt = new TTable(connection, TEST_TABLE);
+        TTable tt1 = new TTable(connection, TEST_TABLE);
+        TTable tt2 = new TTable(connection, TEST_TABLE, true);
 
         Transaction t1 = tm.begin();
         LOG.info("Transaction created " + t1);
@@ -416,22 +413,20 @@ public class TestTransactionConflict extends OmidTestBase {
 
         Put p = new Put(row);
         p.addColumn(fam, col, data1);
-        tt.put(t1, p);
+        tt1.put(t1, p);
 
         Put p2 = new Put(row1);
         p2.addColumn(fam, col, data2);
-        tt.put(t2, p2);
+        tt1.put(t2, p2);
 
         row = Bytes.toBytes("test-simple-cf");
         p = new Put(row);
         p.addColumn(fam, col, data1);
-        tt.markPutAsConflictFreeMutation(p);
-        tt.put(t1, p);
+        tt2.put(t1, p);
 
         p2 = new Put(row);
         p2.addColumn(fam, col, data2);
-        tt.markPutAsConflictFreeMutation(p2);
-        tt.put(t2, p2);
+        tt2.put(t2, p2);
 
         tm.commit(t2);
 
@@ -445,7 +440,8 @@ public class TestTransactionConflict extends OmidTestBase {
     @Test(timeOut = 10_000)
     public void runTestCleanupConflictFreeWritesAfterConflict(ITestContext context) throws Exception {
         TransactionManager tm = newTransactionManager(context);
-        TTable tt = new TTable(connection, TEST_TABLE);
+        TTable tt1 = new TTable(connection, TEST_TABLE);
+        TTable tt2 = new TTable(connection, TEST_TABLE, true);
 
         Transaction t1 = tm.begin();
         LOG.info("Transaction created " + t1);
@@ -462,34 +458,33 @@ public class TestTransactionConflict extends OmidTestBase {
 
         Put p = new Put(row);
         p.addColumn(fam, col, data1);
-        tt.put(t1, p);
+        tt1.put(t1, p);
 
         Get g = new Get(row).setMaxVersions();
         g.addColumn(fam, col);
-        Result r = tt.getHTable().get(g);
+        Result r = tt1.getHTable().get(g);
         assertEquals(r.size(), 1, "Unexpected size for read.");
         assertTrue(Bytes.equals(data1, r.getValue(fam, col)),
                    "Unexpected value for read: " + Bytes.toString(r.getValue(fam, col)));
 
         Put p2 = new Put(row);
         p2.addColumn(fam, col, data2);
-        tt.put(t2, p2);
+        tt1.put(t2, p2);
 
         Put p3 = new Put(row1);
         p3.addColumn(fam, col, data2);
-        tt.markPutAsConflictFreeMutation(p3);
-        tt.put(t2, p3);
+        tt2.put(t2, p3);
 
-        r = tt.getHTable().get(g);
+        r = tt1.getHTable().get(g);
         assertEquals(r.size(), 2, "Unexpected size for read.");
-        r = tt.get(t2, g);
+        r = tt2.get(t2, g);
         assertEquals(r.size(),1, "Unexpected size for read.");
         assertTrue(Bytes.equals(data2, r.getValue(fam, col)),
                    "Unexpected value for read: " + Bytes.toString(r.getValue(fam, col)));
 
         Get g1 = new Get(row1).setMaxVersions();
         g1.addColumn(fam, col);
-        r = tt.getHTable().get(g1);
+        r = tt1.getHTable().get(g1);
         assertEquals(r.size(), 1, "Unexpected size for read.");
 
         tm.commit(t1);
@@ -503,11 +498,11 @@ public class TestTransactionConflict extends OmidTestBase {
         }
         assertTrue(aborted, "Transaction didn't raise exception");
 
-        r = tt.getHTable().get(g);
+        r = tt1.getHTable().get(g);
         assertEquals(r.size(), 1, "Unexpected size for read.");
         assertTrue(Bytes.equals(data1, r.getValue(fam, col)),
                    "Unexpected value for read: " + Bytes.toString(r.getValue(fam, col)));
-        r = tt.getHTable().get(g1);
+        r = tt1.getHTable().get(g1);
         assertEquals(r.size(), 0, "Unexpected size for read.");
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/cb571070/hbase-common/src/main/java/org/apache/omid/transaction/CellUtils.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/omid/transaction/CellUtils.java b/hbase-common/src/main/java/org/apache/omid/transaction/CellUtils.java
index 61a3492..5177e7b 100644
--- a/hbase-common/src/main/java/org/apache/omid/transaction/CellUtils.java
+++ b/hbase-common/src/main/java/org/apache/omid/transaction/CellUtils.java
@@ -57,7 +57,6 @@ public final class CellUtils {
     public static final String TRANSACTION_ATTRIBUTE = "__OMID_TRANSACTION__";
     /**/
     public static final String CLIENT_GET_ATTRIBUTE = "__OMID_CLIENT_GET__";
-    public static final String CONFLICT_FREE_MUTATION = "__OMID_CONFLICT_FREE_MUTATION__";
 
     /**
      * Utility interface to get rid of the dependency on HBase server package


[2/2] incubator-omid git commit: OMID-114 Prevent extra RPCs on TTable batch operations

Posted by ja...@apache.org.
OMID-114 Prevent extra RPCs on TTable batch operations


Project: http://git-wip-us.apache.org/repos/asf/incubator-omid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-omid/commit/62acd247
Tree: http://git-wip-us.apache.org/repos/asf/incubator-omid/tree/62acd247
Diff: http://git-wip-us.apache.org/repos/asf/incubator-omid/diff/62acd247

Branch: refs/heads/phoenix-integration
Commit: 62acd247655304c5ed8b92e5ad4f9ccf75f8b07a
Parents: d0f2648
Author: James Taylor <ja...@apache.org>
Authored: Sun Sep 30 09:56:40 2018 -0700
Committer: James Taylor <ja...@apache.org>
Committed: Tue Oct 2 08:11:54 2018 -0700

----------------------------------------------------------------------
 .../org/apache/omid/transaction/TTable.java     | 62 ++++++++++++++------
 1 file changed, 45 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/62acd247/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java b/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java
index e2e0535..6472b22 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java
@@ -227,6 +227,13 @@ public class TTable implements Closeable {
      * @throws IOException if a remote or network exception occurs.
      */
     public void delete(Transaction tx, Delete delete) throws IOException {
+        Put deleteP = deleteInternal(tx, delete);
+        if (!deleteP.isEmpty()) {
+            addMutation(deleteP);
+        }
+    }
+    
+    private Put deleteInternal(Transaction tx, Delete delete) throws IOException {
 
         throwExceptionIfOpSetsTimerange(delete);
 
@@ -294,10 +301,7 @@ public class TTable implements Closeable {
             }
         }
 
-        if (!deleteP.isEmpty()) {
-            addMutation(deleteP);
-        }
-
+        return deleteP;
     }
 
     public void markPutAsConflictFreeMutation(Put put) {
@@ -350,6 +354,11 @@ public class TTable implements Closeable {
      * @throws IOException if a remote or network exception occurs.
      */
     public void put(Transaction tx, Put put, boolean addShadowCell) throws IOException {
+        Put tsput = putInternal(tx, put, addShadowCell);
+        addMutation(tsput);
+    }
+    
+    private Put putInternal(Transaction tx, Put put, boolean addShadowCell) throws IOException {
 
         throwExceptionIfOpSetsTimerange(put);
 
@@ -392,11 +401,18 @@ public class TTable implements Closeable {
                 }
             }
         }
-        addMutation(tsput);
+        return tsput;
     }
 
     private void addMutation(Mutation m) throws IOException {
-        mutations.add(m);
+        this.mutations.add(m);
+        if (autoFlush) {
+            flushCommits();
+        }
+    }
+    
+    private void addMutations(List<Mutation> mutations) throws IOException {
+        this.mutations.addAll(mutations);
         if (autoFlush) {
             flushCommits();
         }
@@ -548,28 +564,35 @@ public class TTable implements Closeable {
      * @throws IOException if a remote or network exception occurs
      */
     public void put(Transaction transaction, List<Put> puts) throws IOException {
+        List<Mutation> mutations = new ArrayList<>(puts.size());
         for (Put put : puts) {
-            put(transaction, put, false);
+            mutations.add(putInternal(transaction, put, false));
         }
+        addMutations(mutations);
     }
 
     /**
-     * Transactional version of {@link Table#batch(List<? extends Row> mutations)}
+     * Transactional version of {@link Table#batch(List<? extends Row> rows)}
      *
      * @param transaction an instance of transaction to be used
-     * @param mutations        List of rows that must be instances of Put or Delete
+     * @param rows        List of rows that must be instances of Put or Delete
      * @throws IOException if a remote or network exception occurs
      */
-    public void batch(Transaction transaction, List<? extends Row> mutations) throws IOException {
-        for (Row mutation : mutations) {
-            if (mutation instanceof Put) {
-                put(transaction, (Put)mutation);
-            } else if (mutation instanceof Delete) {
-                delete(transaction, (Delete)mutation);
+    public void batch(Transaction transaction, List<? extends Row> rows) throws IOException {
+        List<Mutation> mutations = new ArrayList<>(rows.size());
+        for (Row row : rows) {
+            if (row instanceof Put) {
+                mutations.add(putInternal(transaction, (Put)row, false));
+            } else if (row instanceof Delete) {
+                Put deleteP = deleteInternal(transaction, (Delete)row);
+                if (!deleteP.isEmpty()) {
+                    mutations.add(deleteP);
+                }
             } else {
-                throw new UnsupportedOperationException("Unsupported mutation: " + mutation);
+                throw new UnsupportedOperationException("Unsupported mutation: " + row);
             }
         }
+        addMutations(mutations);
     }
 
     /**
@@ -580,9 +603,14 @@ public class TTable implements Closeable {
      * @throws IOException if a remote or network exception occurs
      */
     public void delete(Transaction transaction, List<Delete> deletes) throws IOException {
+        List<Mutation> mutations = new ArrayList<>(deletes.size());
         for (Delete delete : deletes) {
-            delete(transaction, delete);
+            Put deleteP = deleteInternal(transaction, delete);
+            if (!deleteP.isEmpty()) {
+                mutations.add(deleteP);
+            }
         }
+        addMutations(mutations);
     }
 
     /**