You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by td...@apache.org on 2017/11/16 05:00:27 UTC

phoenix git commit: PHOENIX-4381 Calculate the estimatedSize of MutationState incrementally

Repository: phoenix
Updated Branches:
  refs/heads/4.13-HBase-0.98 c455886c3 -> 92546891e


PHOENIX-4381 Calculate the estimatedSize of MutationState incrementally


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/92546891
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/92546891
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/92546891

Branch: refs/heads/4.13-HBase-0.98
Commit: 92546891e97aa47c17203b8a36a750e0006c3541
Parents: c455886
Author: Thomas D'Silva <td...@apache.org>
Authored: Wed Nov 15 18:54:04 2017 -0800
Committer: Thomas D'Silva <td...@apache.org>
Committed: Wed Nov 15 20:59:59 2017 -0800

----------------------------------------------------------------------
 .../apache/phoenix/execute/MutationState.java   | 15 ++++-
 .../org/apache/phoenix/util/KeyValueUtil.java   | 65 +++++++++-----------
 2 files changed, 43 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/92546891/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 29bd31a..1e5a8ad 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -127,6 +127,7 @@ public class MutationState implements SQLCloseable {
 
     private long sizeOffset;
     private int numRows = 0;
+    private long estimatedSize = 0;
     private int[] uncommittedStatementIndexes = EMPTY_STATEMENT_INDEX_ARRAY;
     private boolean isExternalTxContext = false;
     private Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> txMutations = Collections.emptyMap();
@@ -194,6 +195,7 @@ public class MutationState implements SQLCloseable {
             this.mutations.put(table, mutations);
         }
         this.numRows = mutations.size();
+        this.estimatedSize = KeyValueUtil.getEstimatedRowSize(table, mutations);
         throwIfTooBig();
     }
 
@@ -355,7 +357,6 @@ public class MutationState implements SQLCloseable {
             throw new SQLExceptionInfo.Builder(SQLExceptionCode.MAX_MUTATION_SIZE_EXCEEDED).build()
                     .buildException();
         }
-        long estimatedSize = KeyValueUtil.getEstimatedRowSize(mutations);
         if (estimatedSize > maxSizeBytes) {
             resetState();
             throw new SQLExceptionInfo.Builder(SQLExceptionCode.MAX_MUTATION_SIZE_BYTES_EXCEEDED)
@@ -434,7 +435,12 @@ public class MutationState implements SQLCloseable {
         phoenixTransactionContext.join(newMutationState.getPhoenixTransactionContext());
 
         this.sizeOffset += newMutationState.sizeOffset;
+        int oldNumRows = this.numRows;
         joinMutationState(newMutationState.mutations, this.mutations);
+        // here we increment the estimated size by the fraction of new rows we added from the newMutationState 
+        if (newMutationState.numRows>0) {
+            this.estimatedSize += ((double)(this.numRows-oldNumRows)/newMutationState.numRows) * newMutationState.estimatedSize;
+        }
         if (!newMutationState.txMutations.isEmpty()) {
             if (txMutations.isEmpty()) {
                 txMutations = Maps.newHashMapWithExpectedSize(mutations.size());
@@ -969,6 +975,8 @@ public class MutationState implements SQLCloseable {
                 long mutationCommitTime = 0;
                 long numFailedMutations = 0;;
                 long startTime = 0;
+                long startNumRows = numRows;
+                long startEstimatedSize = estimatedSize;
                 do {
                     TableRef origTableRef = tableInfo.getOrigTableRef();
                     PTable table = origTableRef.getTable();
@@ -1005,8 +1013,8 @@ public class MutationState implements SQLCloseable {
                         for (List<Mutation> mutationBatch : mutationBatchList) {
                             hTable.batch(mutationBatch);
                             batchCount++;
+                            if (logger.isDebugEnabled()) logger.debug("Sent batch of " + mutationBatch.size() + " for " + Bytes.toString(htableName));
                         }
-                        if (logger.isDebugEnabled()) logger.debug("Sent batch of " + numMutations + " for " + Bytes.toString(htableName));
                         child.stop();
                         child.stop();
                         shouldRetry = false;
@@ -1016,6 +1024,8 @@ public class MutationState implements SQLCloseable {
                         
                         if (tableInfo.isDataTable()) {
                             numRows -= numMutations;
+                            // decrement estimated size by the fraction of rows we sent to hbase
+                            estimatedSize -= ((double)numMutations/startNumRows)*startEstimatedSize;
                         }
                         // Remove batches as we process them
                         mutations.remove(origTableRef);
@@ -1181,6 +1191,7 @@ public class MutationState implements SQLCloseable {
 
     private void resetState() {
         numRows = 0;
+        estimatedSize = 0;
         this.mutations.clear();
         resetTransactionalState();
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/92546891/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java
index 4234df5..2dfe1b9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java
@@ -192,46 +192,41 @@ public class KeyValueUtil {
      * @return estimated row size
      */
     public static long
-            getEstimatedRowSize(Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> mutations) {
+            getEstimatedRowSize(TableRef tableRef, Map<ImmutableBytesPtr, RowMutationState> mutations) {
         long size = 0;
-        // iterate over tables
-        for (Entry<TableRef, Map<ImmutableBytesPtr, RowMutationState>> tableEntry : mutations
-                .entrySet()) {
-            PTable table = tableEntry.getKey().getTable();
-            // iterate over rows
-            for (Entry<ImmutableBytesPtr, RowMutationState> rowEntry : tableEntry.getValue()
-                    .entrySet()) {
-                int rowLength = rowEntry.getKey().getLength();
-                Map<PColumn, byte[]> colValueMap = rowEntry.getValue().getColumnValues();
-                switch (table.getImmutableStorageScheme()) {
-                case ONE_CELL_PER_COLUMN:
-                    // iterate over columns
-                    for (Entry<PColumn, byte[]> colValueEntry : colValueMap.entrySet()) {
-                        PColumn pColumn = colValueEntry.getKey();
-                        size +=
-                                KeyValue.getKeyValueDataStructureSize(rowLength,
-                                    pColumn.getFamilyName().getBytes().length,
-                                    pColumn.getColumnQualifierBytes().length,
-                                    colValueEntry.getValue().length);
-                    }
-                    break;
-                case SINGLE_CELL_ARRAY_WITH_OFFSETS:
-                    // we store all the column values in a single key value that contains all the
-                    // column values followed by an offset array
+        PTable table = tableRef.getTable();
+        // iterate over rows
+        for (Entry<ImmutableBytesPtr, RowMutationState> rowEntry : mutations.entrySet()) {
+            int rowLength = rowEntry.getKey().getLength();
+            Map<PColumn, byte[]> colValueMap = rowEntry.getValue().getColumnValues();
+            switch (table.getImmutableStorageScheme()) {
+            case ONE_CELL_PER_COLUMN:
+                // iterate over columns
+                for (Entry<PColumn, byte[]> colValueEntry : colValueMap.entrySet()) {
+                    PColumn pColumn = colValueEntry.getKey();
                     size +=
-                            PArrayDataTypeEncoder.getEstimatedByteSize(table, rowLength,
-                                colValueMap);
-                    break;
+                            KeyValue.getKeyValueDataStructureSize(rowLength,
+                                pColumn.getFamilyName().getBytes().length,
+                                pColumn.getColumnQualifierBytes().length,
+                                colValueEntry.getValue().length);
                 }
-                // count the empty key value
-                Pair<byte[], byte[]> emptyKeyValueInfo =
-                        EncodedColumnsUtil.getEmptyKeyValueInfo(table);
+                break;
+            case SINGLE_CELL_ARRAY_WITH_OFFSETS:
+                // we store all the column values in a single key value that contains all the
+                // column values followed by an offset array
                 size +=
-                        KeyValue.getKeyValueDataStructureSize(rowLength,
-                            SchemaUtil.getEmptyColumnFamilyPtr(table).getLength(),
-                            emptyKeyValueInfo.getFirst().length,
-                            emptyKeyValueInfo.getSecond().length);
+                        PArrayDataTypeEncoder.getEstimatedByteSize(table, rowLength,
+                            colValueMap);
+                break;
             }
+            // count the empty key value
+            Pair<byte[], byte[]> emptyKeyValueInfo =
+                    EncodedColumnsUtil.getEmptyKeyValueInfo(table);
+            size +=
+                    KeyValue.getKeyValueDataStructureSize(rowLength,
+                        SchemaUtil.getEmptyColumnFamilyPtr(table).getLength(),
+                        emptyKeyValueInfo.getFirst().length,
+                        emptyKeyValueInfo.getSecond().length);
         }
         return size;
     }