You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by la...@apache.org on 2019/05/13 00:24:39 UTC

[phoenix] branch 4.x-HBase-1.3 updated: Allow transactional writes without buffering the entire transaction on the client.

This is an automated email from the ASF dual-hosted git repository.

larsh pushed a commit to branch 4.x-HBase-1.3
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x-HBase-1.3 by this push:
     new 973251a  Allow transactional writes without buffering the entire transaction on the client.
973251a is described below

commit 973251aae7eef1453f758f6198f6f68cad845645
Author: Lars Hofhansl <la...@apache.org>
AuthorDate: Sun May 12 17:24:48 2019 -0700

    Allow transactional writes without buffering the entire transaction on the client.
---
 .../main/java/org/apache/phoenix/compile/DeleteCompiler.java | 12 +++++++-----
 .../phoenix/compile/MutatingParallelIteratorFactory.java     |  5 +++--
 .../main/java/org/apache/phoenix/compile/UpsertCompiler.java |  8 +++++---
 3 files changed, 15 insertions(+), 10 deletions(-)

diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index 7e97db5..4483be4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -134,7 +134,9 @@ public class DeleteCompiler {
         if (tenantId != null) {
             tenantIdBytes = ScanUtil.getTenantIdBytes(table.getRowKeySchema(), table.getBucketNum() != null, tenantId, table.getViewIndexId() != null);
         }
-        final boolean isAutoCommit = connection.getAutoCommit();
+        // we automatically flush the mutations when either auto commit is enabled, or
+        // the target table is transactional (in that case changes are not visible until we commit)
+        final boolean autoFlush = connection.getAutoCommit() || tableRef.getTable().isTransactional();
         ConnectionQueryServices services = connection.getQueryServices();
         final int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
         final int maxSizeBytes = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE_BYTES);
@@ -246,8 +248,8 @@ public class DeleteCompiler {
                     throw new IllegalArgumentException("MutationState size of " + mutations.size() + " is bigger than max allowed size of " + maxSize);
                 }
                 rowCount++;
-                // Commit a batch if auto commit is true and we're at our batch size
-                if (isAutoCommit && rowCount % batchSize == 0) {
+                // Commit a batch if we are flushing automatically and we're at our batch size
+                if (autoFlush && rowCount % batchSize == 0) {
                     MutationState state = new MutationState(tableRef, mutations, 0, maxSize, maxSizeBytes, connection);
                     connection.getMutationState().join(state);
                     for (int i = 0; i < otherTableRefs.size(); i++) {
@@ -264,8 +266,8 @@ public class DeleteCompiler {
                 }
             }
 
-            // If auto commit is true, this last batch will be committed upon return
-            int nCommittedRows = isAutoCommit ? (rowCount / batchSize * batchSize) : 0;
+            // If auto flush is true, this last batch will be committed upon return
+            int nCommittedRows = autoFlush ? (rowCount / batchSize * batchSize) : 0;
             MutationState state = new MutationState(tableRef, mutations, nCommittedRows, maxSize, maxSizeBytes, connection);
             for (int i = 0; i < otherTableRefs.size(); i++) {
                 MutationState indexState = new MutationState(otherTableRefs.get(i), otherMutations.get(i), 0, maxSize, maxSizeBytes, connection);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
index 8e63fa9..542ab67 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
@@ -58,8 +58,9 @@ public abstract class MutatingParallelIteratorFactory implements ParallelIterato
         
         MutationState state = mutate(parentContext, iterator, clonedConnection);
         
-        long totalRowCount = state.getUpdateCount();
-        if (clonedConnection.getAutoCommit()) {
+        final long totalRowCount = state.getUpdateCount();
+        final boolean autoFlush = connection.getAutoCommit() || plan.getTableRef().getTable().isTransactional();
+        if (autoFlush) {
             clonedConnection.getMutationState().join(state);
             state = clonedConnection.getMutationState();
         }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 5509582..b237059 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -186,7 +186,9 @@ public class UpsertCompiler {
                 services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB,
                     QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE_BYTES);
         int batchSize = Math.min(connection.getMutateBatchSize(), maxSize);
-        boolean isAutoCommit = connection.getAutoCommit();
+        // we automatically flush the mutations when either auto commit is enabled, or
+        // the target table is transactional (in that case changes are not visible until we commit)
+        final boolean autoFlush = connection.getAutoCommit() || tableRef.getTable().isTransactional();
         int sizeOffset = 0;
         int numSplColumns =
                 (tableRef.getTable().isMultiTenant() ? 1 : 0)
@@ -245,7 +247,7 @@ public class UpsertCompiler {
                 setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement, useServerTimestamp, indexMaintainer, viewConstants, null, numSplColumns);
                 rowCount++;
                 // Commit a batch if auto commit is true and we're at our batch size
-                if (isAutoCommit && rowCount % batchSize == 0) {
+                if (autoFlush && rowCount % batchSize == 0) {
                     MutationState state = new MutationState(tableRef, mutation, 0, maxSize, maxSizeBytes, connection);
                     connection.getMutationState().join(state);
                     connection.getMutationState().send();
@@ -253,7 +255,7 @@ public class UpsertCompiler {
                 }
             }
 
-            if (isAutoCommit) {
+            if (autoFlush) {
                 // If auto commit is true, this last batch will be committed upon return
                 sizeOffset = rowCount / batchSize * batchSize;
             }