You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by la...@apache.org on 2019/05/13 00:24:39 UTC
[phoenix] branch 4.x-HBase-1.3 updated: Allow transactional writes
without buffering the entire transaction on the client.
This is an automated email from the ASF dual-hosted git repository.
larsh pushed a commit to branch 4.x-HBase-1.3
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x-HBase-1.3 by this push:
new 973251a Allow transactional writes without buffering the entire transaction on the client.
973251a is described below
commit 973251aae7eef1453f758f6198f6f68cad845645
Author: Lars Hofhansl <la...@apache.org>
AuthorDate: Sun May 12 17:24:48 2019 -0700
Allow transactional writes without buffering the entire transaction on the client.
---
.../main/java/org/apache/phoenix/compile/DeleteCompiler.java | 12 +++++++-----
.../phoenix/compile/MutatingParallelIteratorFactory.java | 5 +++--
.../main/java/org/apache/phoenix/compile/UpsertCompiler.java | 8 +++++---
3 files changed, 15 insertions(+), 10 deletions(-)
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index 7e97db5..4483be4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -134,7 +134,9 @@ public class DeleteCompiler {
if (tenantId != null) {
tenantIdBytes = ScanUtil.getTenantIdBytes(table.getRowKeySchema(), table.getBucketNum() != null, tenantId, table.getViewIndexId() != null);
}
- final boolean isAutoCommit = connection.getAutoCommit();
+ // we automatically flush the mutations when either auto commit is enabled, or
+ // the target table is transactional (in that case changes are not visible until we commit)
+ final boolean autoFlush = connection.getAutoCommit() || tableRef.getTable().isTransactional();
ConnectionQueryServices services = connection.getQueryServices();
final int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
final int maxSizeBytes = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE_BYTES);
@@ -246,8 +248,8 @@ public class DeleteCompiler {
throw new IllegalArgumentException("MutationState size of " + mutations.size() + " is bigger than max allowed size of " + maxSize);
}
rowCount++;
- // Commit a batch if auto commit is true and we're at our batch size
- if (isAutoCommit && rowCount % batchSize == 0) {
+ // Commit a batch if we are flushing automatically and we're at our batch size
+ if (autoFlush && rowCount % batchSize == 0) {
MutationState state = new MutationState(tableRef, mutations, 0, maxSize, maxSizeBytes, connection);
connection.getMutationState().join(state);
for (int i = 0; i < otherTableRefs.size(); i++) {
@@ -264,8 +266,8 @@ public class DeleteCompiler {
}
}
- // If auto commit is true, this last batch will be committed upon return
- int nCommittedRows = isAutoCommit ? (rowCount / batchSize * batchSize) : 0;
+ // If auto flush is true, this last batch will be committed upon return
+ int nCommittedRows = autoFlush ? (rowCount / batchSize * batchSize) : 0;
MutationState state = new MutationState(tableRef, mutations, nCommittedRows, maxSize, maxSizeBytes, connection);
for (int i = 0; i < otherTableRefs.size(); i++) {
MutationState indexState = new MutationState(otherTableRefs.get(i), otherMutations.get(i), 0, maxSize, maxSizeBytes, connection);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
index 8e63fa9..542ab67 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
@@ -58,8 +58,9 @@ public abstract class MutatingParallelIteratorFactory implements ParallelIterato
MutationState state = mutate(parentContext, iterator, clonedConnection);
- long totalRowCount = state.getUpdateCount();
- if (clonedConnection.getAutoCommit()) {
+ final long totalRowCount = state.getUpdateCount();
+ final boolean autoFlush = connection.getAutoCommit() || plan.getTableRef().getTable().isTransactional();
+ if (autoFlush) {
clonedConnection.getMutationState().join(state);
state = clonedConnection.getMutationState();
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 5509582..b237059 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -186,7 +186,9 @@ public class UpsertCompiler {
services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB,
QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE_BYTES);
int batchSize = Math.min(connection.getMutateBatchSize(), maxSize);
- boolean isAutoCommit = connection.getAutoCommit();
+ // we automatically flush the mutations when either auto commit is enabled, or
+ // the target table is transactional (in that case changes are not visible until we commit)
+ final boolean autoFlush = connection.getAutoCommit() || tableRef.getTable().isTransactional();
int sizeOffset = 0;
int numSplColumns =
(tableRef.getTable().isMultiTenant() ? 1 : 0)
@@ -245,7 +247,7 @@ public class UpsertCompiler {
setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement, useServerTimestamp, indexMaintainer, viewConstants, null, numSplColumns);
rowCount++;
// Commit a batch if auto commit is true and we're at our batch size
- if (isAutoCommit && rowCount % batchSize == 0) {
+ if (autoFlush && rowCount % batchSize == 0) {
MutationState state = new MutationState(tableRef, mutation, 0, maxSize, maxSizeBytes, connection);
connection.getMutationState().join(state);
connection.getMutationState().send();
@@ -253,7 +255,7 @@ public class UpsertCompiler {
}
}
- if (isAutoCommit) {
+ if (autoFlush) {
// If auto commit is true, this last batch will be committed upon return
sizeOffset = rowCount / batchSize * batchSize;
}