You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@omid.apache.org by ja...@apache.org on 2018/10/02 15:12:36 UTC

[2/2] incubator-omid git commit: OMID-114 Prevent extra RPCs on TTable batch operations

OMID-114 Prevent extra RPCs on TTable batch operations


Project: http://git-wip-us.apache.org/repos/asf/incubator-omid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-omid/commit/62acd247
Tree: http://git-wip-us.apache.org/repos/asf/incubator-omid/tree/62acd247
Diff: http://git-wip-us.apache.org/repos/asf/incubator-omid/diff/62acd247

Branch: refs/heads/phoenix-integration
Commit: 62acd247655304c5ed8b92e5ad4f9ccf75f8b07a
Parents: d0f2648
Author: James Taylor <ja...@apache.org>
Authored: Sun Sep 30 09:56:40 2018 -0700
Committer: James Taylor <ja...@apache.org>
Committed: Tue Oct 2 08:11:54 2018 -0700

----------------------------------------------------------------------
 .../org/apache/omid/transaction/TTable.java     | 62 ++++++++++++++------
 1 file changed, 45 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/62acd247/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java b/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java
index e2e0535..6472b22 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java
@@ -227,6 +227,13 @@ public class TTable implements Closeable {
      * @throws IOException if a remote or network exception occurs.
      */
     public void delete(Transaction tx, Delete delete) throws IOException {
+        Put deleteP = deleteInternal(tx, delete);
+        if (!deleteP.isEmpty()) {
+            addMutation(deleteP);
+        }
+    }
+    
+    private Put deleteInternal(Transaction tx, Delete delete) throws IOException {
 
         throwExceptionIfOpSetsTimerange(delete);
 
@@ -294,10 +301,7 @@ public class TTable implements Closeable {
             }
         }
 
-        if (!deleteP.isEmpty()) {
-            addMutation(deleteP);
-        }
-
+        return deleteP;
     }
 
     public void markPutAsConflictFreeMutation(Put put) {
@@ -350,6 +354,11 @@ public class TTable implements Closeable {
      * @throws IOException if a remote or network exception occurs.
      */
     public void put(Transaction tx, Put put, boolean addShadowCell) throws IOException {
+        Put tsput = putInternal(tx, put, addShadowCell);
+        addMutation(tsput);
+    }
+    
+    private Put putInternal(Transaction tx, Put put, boolean addShadowCell) throws IOException {
 
         throwExceptionIfOpSetsTimerange(put);
 
@@ -392,11 +401,18 @@ public class TTable implements Closeable {
                 }
             }
         }
-        addMutation(tsput);
+        return tsput;
     }
 
     private void addMutation(Mutation m) throws IOException {
-        mutations.add(m);
+        this.mutations.add(m);
+        if (autoFlush) {
+            flushCommits();
+        }
+    }
+    
+    private void addMutations(List<Mutation> mutations) throws IOException {
+        this.mutations.addAll(mutations);
         if (autoFlush) {
             flushCommits();
         }
@@ -548,28 +564,35 @@ public class TTable implements Closeable {
      * @throws IOException if a remote or network exception occurs
      */
     public void put(Transaction transaction, List<Put> puts) throws IOException {
+        List<Mutation> mutations = new ArrayList<>(puts.size());
         for (Put put : puts) {
-            put(transaction, put, false);
+            mutations.add(putInternal(transaction, put, false));
         }
+        addMutations(mutations);
     }
 
     /**
-     * Transactional version of {@link Table#batch(List<? extends Row> mutations)}
+     * Transactional version of {@link Table#batch(List<? extends Row> rows)}
      *
      * @param transaction an instance of transaction to be used
-     * @param mutations        List of rows that must be instances of Put or Delete
+     * @param rows        List of rows that must be instances of Put or Delete
      * @throws IOException if a remote or network exception occurs
      */
-    public void batch(Transaction transaction, List<? extends Row> mutations) throws IOException {
-        for (Row mutation : mutations) {
-            if (mutation instanceof Put) {
-                put(transaction, (Put)mutation);
-            } else if (mutation instanceof Delete) {
-                delete(transaction, (Delete)mutation);
+    public void batch(Transaction transaction, List<? extends Row> rows) throws IOException {
+        List<Mutation> mutations = new ArrayList<>(rows.size());
+        for (Row row : rows) {
+            if (row instanceof Put) {
+                mutations.add(putInternal(transaction, (Put)row, false));
+            } else if (row instanceof Delete) {
+                Put deleteP = deleteInternal(transaction, (Delete)row);
+                if (!deleteP.isEmpty()) {
+                    mutations.add(deleteP);
+                }
             } else {
-                throw new UnsupportedOperationException("Unsupported mutation: " + mutation);
+                throw new UnsupportedOperationException("Unsupported mutation: " + row);
             }
         }
+        addMutations(mutations);
     }
 
     /**
@@ -580,9 +603,14 @@ public class TTable implements Closeable {
      * @throws IOException if a remote or network exception occurs
      */
     public void delete(Transaction transaction, List<Delete> deletes) throws IOException {
+        List<Mutation> mutations = new ArrayList<>(deletes.size());
         for (Delete delete : deletes) {
-            delete(transaction, delete);
+            Put deleteP = deleteInternal(transaction, delete);
+            if (!deleteP.isEmpty()) {
+                mutations.add(deleteP);
+            }
         }
+        addMutations(mutations);
     }
 
     /**