You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2017/11/18 19:44:25 UTC
[2/4] phoenix git commit: PHOENIX-4381 Calculate the estimatedSize of
MutationState incrementally
PHOENIX-4381 Calculate the estimatedSize of MutationState incrementally
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b068aebf
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b068aebf
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b068aebf
Branch: refs/heads/4.x-HBase-1.1
Commit: b068aebf6c91bbb0a11c92e423cc287620eb0a13
Parents: f431894
Author: Thomas D'Silva <td...@apache.org>
Authored: Wed Nov 15 18:54:04 2017 -0800
Committer: James Taylor <jt...@salesforce.com>
Committed: Sat Nov 18 11:42:53 2017 -0800
----------------------------------------------------------------------
.../apache/phoenix/execute/MutationState.java | 15 ++++-
.../org/apache/phoenix/util/KeyValueUtil.java | 65 +++++++++-----------
2 files changed, 43 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b068aebf/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 9c26575..9fc62c7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -127,6 +127,7 @@ public class MutationState implements SQLCloseable {
private long sizeOffset;
private int numRows = 0;
+ private long estimatedSize = 0;
private int[] uncommittedStatementIndexes = EMPTY_STATEMENT_INDEX_ARRAY;
private boolean isExternalTxContext = false;
private Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> txMutations = Collections.emptyMap();
@@ -193,6 +194,7 @@ public class MutationState implements SQLCloseable {
this.mutations.put(table, mutations);
}
this.numRows = mutations.size();
+ this.estimatedSize = KeyValueUtil.getEstimatedRowSize(table, mutations);
throwIfTooBig();
}
@@ -355,7 +357,6 @@ public class MutationState implements SQLCloseable {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.MAX_MUTATION_SIZE_EXCEEDED).build()
.buildException();
}
- long estimatedSize = KeyValueUtil.getEstimatedRowSize(mutations);
if (estimatedSize > maxSizeBytes) {
resetState();
throw new SQLExceptionInfo.Builder(SQLExceptionCode.MAX_MUTATION_SIZE_BYTES_EXCEEDED)
@@ -434,7 +435,12 @@ public class MutationState implements SQLCloseable {
phoenixTransactionContext.join(newMutationState.getPhoenixTransactionContext());
this.sizeOffset += newMutationState.sizeOffset;
+ int oldNumRows = this.numRows;
joinMutationState(newMutationState.mutations, this.mutations);
+ // here we increment the estimated size by the fraction of new rows we added from the newMutationState
+ if (newMutationState.numRows>0) {
+ this.estimatedSize += ((double)(this.numRows-oldNumRows)/newMutationState.numRows) * newMutationState.estimatedSize;
+ }
if (!newMutationState.txMutations.isEmpty()) {
if (txMutations.isEmpty()) {
txMutations = Maps.newHashMapWithExpectedSize(mutations.size());
@@ -969,6 +975,8 @@ public class MutationState implements SQLCloseable {
long mutationCommitTime = 0;
long numFailedMutations = 0;;
long startTime = 0;
+ long startNumRows = numRows;
+ long startEstimatedSize = estimatedSize;
do {
TableRef origTableRef = tableInfo.getOrigTableRef();
PTable table = origTableRef.getTable();
@@ -1005,8 +1013,8 @@ public class MutationState implements SQLCloseable {
for (List<Mutation> mutationBatch : mutationBatchList) {
hTable.batch(mutationBatch);
batchCount++;
+ if (logger.isDebugEnabled()) logger.debug("Sent batch of " + mutationBatch.size() + " for " + Bytes.toString(htableName));
}
- if (logger.isDebugEnabled()) logger.debug("Sent batch of " + numMutations + " for " + Bytes.toString(htableName));
child.stop();
child.stop();
shouldRetry = false;
@@ -1016,6 +1024,8 @@ public class MutationState implements SQLCloseable {
if (tableInfo.isDataTable()) {
numRows -= numMutations;
+ // decrement estimated size by the fraction of rows we sent to hbase
+ estimatedSize -= ((double)numMutations/startNumRows)*startEstimatedSize;
}
// Remove batches as we process them
mutations.remove(origTableRef);
@@ -1181,6 +1191,7 @@ public class MutationState implements SQLCloseable {
private void resetState() {
numRows = 0;
+ estimatedSize = 0;
this.mutations.clear();
resetTransactionalState();
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b068aebf/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java
index 4234df5..2dfe1b9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java
@@ -192,46 +192,41 @@ public class KeyValueUtil {
* @return estimated row size
*/
public static long
- getEstimatedRowSize(Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> mutations) {
+ getEstimatedRowSize(TableRef tableRef, Map<ImmutableBytesPtr, RowMutationState> mutations) {
long size = 0;
- // iterate over tables
- for (Entry<TableRef, Map<ImmutableBytesPtr, RowMutationState>> tableEntry : mutations
- .entrySet()) {
- PTable table = tableEntry.getKey().getTable();
- // iterate over rows
- for (Entry<ImmutableBytesPtr, RowMutationState> rowEntry : tableEntry.getValue()
- .entrySet()) {
- int rowLength = rowEntry.getKey().getLength();
- Map<PColumn, byte[]> colValueMap = rowEntry.getValue().getColumnValues();
- switch (table.getImmutableStorageScheme()) {
- case ONE_CELL_PER_COLUMN:
- // iterate over columns
- for (Entry<PColumn, byte[]> colValueEntry : colValueMap.entrySet()) {
- PColumn pColumn = colValueEntry.getKey();
- size +=
- KeyValue.getKeyValueDataStructureSize(rowLength,
- pColumn.getFamilyName().getBytes().length,
- pColumn.getColumnQualifierBytes().length,
- colValueEntry.getValue().length);
- }
- break;
- case SINGLE_CELL_ARRAY_WITH_OFFSETS:
- // we store all the column values in a single key value that contains all the
- // column values followed by an offset array
+ PTable table = tableRef.getTable();
+ // iterate over rows
+ for (Entry<ImmutableBytesPtr, RowMutationState> rowEntry : mutations.entrySet()) {
+ int rowLength = rowEntry.getKey().getLength();
+ Map<PColumn, byte[]> colValueMap = rowEntry.getValue().getColumnValues();
+ switch (table.getImmutableStorageScheme()) {
+ case ONE_CELL_PER_COLUMN:
+ // iterate over columns
+ for (Entry<PColumn, byte[]> colValueEntry : colValueMap.entrySet()) {
+ PColumn pColumn = colValueEntry.getKey();
size +=
- PArrayDataTypeEncoder.getEstimatedByteSize(table, rowLength,
- colValueMap);
- break;
+ KeyValue.getKeyValueDataStructureSize(rowLength,
+ pColumn.getFamilyName().getBytes().length,
+ pColumn.getColumnQualifierBytes().length,
+ colValueEntry.getValue().length);
}
- // count the empty key value
- Pair<byte[], byte[]> emptyKeyValueInfo =
- EncodedColumnsUtil.getEmptyKeyValueInfo(table);
+ break;
+ case SINGLE_CELL_ARRAY_WITH_OFFSETS:
+ // we store all the column values in a single key value that contains all the
+ // column values followed by an offset array
size +=
- KeyValue.getKeyValueDataStructureSize(rowLength,
- SchemaUtil.getEmptyColumnFamilyPtr(table).getLength(),
- emptyKeyValueInfo.getFirst().length,
- emptyKeyValueInfo.getSecond().length);
+ PArrayDataTypeEncoder.getEstimatedByteSize(table, rowLength,
+ colValueMap);
+ break;
}
+ // count the empty key value
+ Pair<byte[], byte[]> emptyKeyValueInfo =
+ EncodedColumnsUtil.getEmptyKeyValueInfo(table);
+ size +=
+ KeyValue.getKeyValueDataStructureSize(rowLength,
+ SchemaUtil.getEmptyColumnFamilyPtr(table).getLength(),
+ emptyKeyValueInfo.getFirst().length,
+ emptyKeyValueInfo.getSecond().length);
}
return size;
}