You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ra...@apache.org on 2018/02/20 11:27:07 UTC

phoenix git commit: PHOENIX-4386 Calculate the estimatedSize of MutationState using Map> mutations(Thomas D'Silva)

Repository: phoenix
Updated Branches:
  refs/heads/5.x-HBase-2.0 6cadbab92 -> aeb33b9fb


PHOENIX-4386 Calculate the estimatedSize of MutationState using Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> mutations(Thomas D'Silva)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/aeb33b9f
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/aeb33b9f
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/aeb33b9f

Branch: refs/heads/5.x-HBase-2.0
Commit: aeb33b9fbae9d19da199fed8e54d60939bdd57d8
Parents: 6cadbab
Author: Rajeshbabu Chintaguntla <ra...@apache.org>
Authored: Tue Feb 20 16:56:43 2018 +0530
Committer: Rajeshbabu Chintaguntla <ra...@apache.org>
Committed: Tue Feb 20 16:56:43 2018 +0530

----------------------------------------------------------------------
 .../apache/phoenix/end2end/MutationStateIT.java | 144 +++++++++++++++++
 .../org/apache/phoenix/end2end/QueryMoreIT.java |  42 -----
 .../apache/phoenix/execute/PartialCommitIT.java |   3 +-
 .../apache/phoenix/compile/DeleteCompiler.java  |  18 +--
 .../apache/phoenix/compile/UpsertCompiler.java  |  11 +-
 .../apache/phoenix/execute/MutationState.java   | 159 +++++++++++++------
 .../java/org/apache/phoenix/util/IndexUtil.java |   4 +-
 .../phoenix/util/PhoenixKeyValueUtil.java       |  48 ++----
 8 files changed, 289 insertions(+), 140 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/aeb33b9f/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java
new file mode 100644
index 0000000..2d5f360
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java
@@ -0,0 +1,144 @@
+package org.apache.phoenix.end2end;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Properties;
+
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryServices;
+import org.junit.Test;
+
+public class MutationStateIT extends ParallelStatsDisabledIT {
+
+    private static final String DDL =
+            " (ORGANIZATION_ID CHAR(15) NOT NULL, SCORE DOUBLE, "
+            + "ENTITY_ID CHAR(15) NOT NULL, TAGS VARCHAR, CONSTRAINT PAGE_SNAPSHOT_PK "
+            + "PRIMARY KEY (ORGANIZATION_ID, ENTITY_ID DESC)) MULTI_TENANT=TRUE";
+
+    private void upsertRows(PhoenixConnection conn, String fullTableName) throws SQLException {
+        PreparedStatement stmt =
+                conn.prepareStatement("upsert into " + fullTableName
+                        + " (organization_id, entity_id, score) values (?,?,?)");
+        for (int i = 0; i < 10000; i++) {
+            stmt.setString(1, "AAAA" + i);
+            stmt.setString(2, "BBBB" + i);
+            stmt.setInt(3, 1);
+            stmt.execute();
+        }
+    }
+
+    @Test
+    public void testMaxMutationSize() throws Exception {
+        Properties connectionProperties = new Properties();
+        connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_ATTRIB, "3");
+        connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB, "1000000");
+        PhoenixConnection connection =
+                (PhoenixConnection) DriverManager.getConnection(getUrl(), connectionProperties);
+        String fullTableName = generateUniqueName();
+        try (Statement stmt = connection.createStatement()) {
+            stmt.execute(
+                "CREATE TABLE " + fullTableName + DDL);
+        }
+        try {
+            upsertRows(connection, fullTableName);
+            fail();
+        } catch (SQLException e) {
+            assertEquals(SQLExceptionCode.MAX_MUTATION_SIZE_EXCEEDED.getErrorCode(),
+                e.getErrorCode());
+        }
+
+        // set the max mutation size (bytes) to a low value
+        connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_ATTRIB, "1000");
+        connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB, "4");
+        connection =
+                (PhoenixConnection) DriverManager.getConnection(getUrl(), connectionProperties);
+        try {
+            upsertRows(connection, fullTableName);
+            fail();
+        } catch (SQLException e) {
+            assertEquals(SQLExceptionCode.MAX_MUTATION_SIZE_BYTES_EXCEEDED.getErrorCode(),
+                e.getErrorCode());
+        }
+    }
+
+    @Test
+    public void testMutationEstimatedSize() throws Exception {
+        PhoenixConnection conn = (PhoenixConnection) DriverManager.getConnection(getUrl());
+        conn.setAutoCommit(false);
+        String fullTableName = generateUniqueName();
+        try (Statement stmt = conn.createStatement()) {
+            stmt.execute(
+                "CREATE TABLE " + fullTableName + DDL);
+        }
+
+        // upserting rows should increase the mutation state size
+        MutationState state = conn.unwrap(PhoenixConnection.class).getMutationState();
+        long prevEstimatedSize = state.getEstimatedSize();
+        upsertRows(conn, fullTableName);
+        assertTrue("Mutation state size should have increased",
+            state.getEstimatedSize() > prevEstimatedSize);
+        
+        
+        // after commit or rollback the size should be zero
+        conn.commit();
+        assertEquals("Mutation state size should be zero after commit", 0,
+            state.getEstimatedSize());
+        upsertRows(conn, fullTableName);
+        conn.rollback();
+        assertEquals("Mutation state size should be zero after rollback", 0,
+            state.getEstimatedSize());
+
+        // upsert one row
+        PreparedStatement stmt =
+                conn.prepareStatement("upsert into " + fullTableName
+                        + " (organization_id, entity_id, score) values (?,?,?)");
+        stmt.setString(1, "ZZZZ");
+        stmt.setString(2, "YYYY");
+        stmt.setInt(3, 1);
+        stmt.execute();
+        assertTrue("Mutation state size should be greater than zero ", state.getEstimatedSize()>0);
+
+        prevEstimatedSize = state.getEstimatedSize();
+        // upserting the same row twice should not increase the size
+        stmt.setString(1, "ZZZZ");
+        stmt.setString(2, "YYYY");
+        stmt.setInt(3, 1);
+        stmt.execute();
+        assertEquals(
+            "Mutation state size should only increase 4 bytes (size of the new statement index)",
+            prevEstimatedSize + 4, state.getEstimatedSize());
+        
+        prevEstimatedSize = state.getEstimatedSize();
+        // changing the value of one column of a row to a larger value should increase the estimated size 
+        stmt =
+                conn.prepareStatement("upsert into " + fullTableName
+                        + " (organization_id, entity_id, score, tags) values (?,?,?,?)");
+        stmt.setString(1, "ZZZZ");
+        stmt.setString(2, "YYYY");
+        stmt.setInt(3, 1);
+        stmt.setString(4, "random text string random text string random text string");
+        stmt.execute();
+        assertTrue("Mutation state size should increase", prevEstimatedSize+4 < state.getEstimatedSize());
+        
+        prevEstimatedSize = state.getEstimatedSize();
+        // changing the value of one column of a row to a smaller value should decrease the estimated size 
+        stmt =
+                conn.prepareStatement("upsert into " + fullTableName
+                        + " (organization_id, entity_id, score, tags) values (?,?,?,?)");
+        stmt.setString(1, "ZZZZ");
+        stmt.setString(2, "YYYY");
+        stmt.setInt(3, 1);
+        stmt.setString(4, "");
+        stmt.execute();
+        assertTrue("Mutation state size should decrease", prevEstimatedSize+4 > state.getEstimatedSize());
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aeb33b9f/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java
index 77cb19f..9109c12 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java
@@ -22,7 +22,6 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 import java.sql.Connection;
 import java.sql.Date;
@@ -39,7 +38,6 @@ import java.util.Properties;
 
 import org.apache.hadoop.hbase.util.Base64;
 import org.apache.hadoop.hbase.util.Pair;
-import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.PhoenixRuntime;
@@ -510,46 +508,6 @@ public class QueryMoreIT extends ParallelStatsDisabledIT {
         assertEquals(4L, connection.getMutationState().getBatchCount());
     }
     
-    @Test
-    public void testMaxMutationSize() throws Exception {
-        Properties connectionProperties = new Properties();
-        connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_ATTRIB, "3");
-        connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB, "1000000");
-        PhoenixConnection connection = (PhoenixConnection) DriverManager.getConnection(getUrl(), connectionProperties);
-        String fullTableName = generateUniqueName();
-        try (Statement stmt = connection.createStatement()) {
-            stmt.execute("CREATE TABLE " + fullTableName + "(\n" +
-                "    ORGANIZATION_ID CHAR(15) NOT NULL,\n" +
-                "    SCORE DOUBLE NOT NULL,\n" +
-                "    ENTITY_ID CHAR(15) NOT NULL\n" +
-                "    CONSTRAINT PAGE_SNAPSHOT_PK PRIMARY KEY (\n" +
-                "        ORGANIZATION_ID,\n" +
-                "        SCORE DESC,\n" +
-                "        ENTITY_ID DESC\n" +
-                "    )\n" +
-                ") MULTI_TENANT=TRUE");
-        }
-        try {
-            upsertRows(connection, fullTableName);
-            fail();
-        }
-        catch(SQLException e) {
-            assertEquals(SQLExceptionCode.MAX_MUTATION_SIZE_EXCEEDED.getErrorCode(), e.getErrorCode());
-        }
-        
-        // set the max mutation size (bytes) to a low value
-        connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_ATTRIB, "1000");
-        connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB, "4");
-        connection = (PhoenixConnection) DriverManager.getConnection(getUrl(), connectionProperties);
-        try {
-            upsertRows(connection, fullTableName);
-            fail();
-        }
-        catch(SQLException e) {
-            assertEquals(SQLExceptionCode.MAX_MUTATION_SIZE_BYTES_EXCEEDED.getErrorCode(), e.getErrorCode());
-        }
-    }
-
     private void upsertRows(PhoenixConnection conn, String fullTableName) throws SQLException {
         PreparedStatement stmt = conn.prepareStatement("upsert into " + fullTableName +
                 " (organization_id, entity_id, score) values (?,?,?)");

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aeb33b9f/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
index 2ceac55..58dcceb 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.phoenix.end2end.BaseOwnClusterIT;
+import org.apache.phoenix.execute.MutationState.MultiRowMutationState;
 import org.apache.phoenix.hbase.index.Indexer;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.jdbc.PhoenixConnection;
@@ -284,7 +285,7 @@ public class PartialCommitIT extends BaseOwnClusterIT {
     private PhoenixConnection getConnectionWithTableOrderPreservingMutationState() throws SQLException {
         Connection con = driver.connect(url, new Properties());
         PhoenixConnection phxCon = new PhoenixConnection(con.unwrap(PhoenixConnection.class));
-        final Map<TableRef,Map<ImmutableBytesPtr,MutationState.RowMutationState>> mutations = Maps.newTreeMap(new TableRefComparator());
+        final Map<TableRef, MultiRowMutationState> mutations = Maps.newTreeMap(new TableRefComparator());
         // passing a null mutation state forces the connection.newMutationState() to be used to create the MutationState
         return new PhoenixConnection(phxCon, null) {
             @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aeb33b9f/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index 53fc398..a635c69 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -44,6 +44,7 @@ import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.AggregatePlan;
 import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.execute.MutationState.MultiRowMutationState;
 import org.apache.phoenix.execute.MutationState.RowMutationState;
 import org.apache.phoenix.filter.SkipScanFilter;
 import org.apache.phoenix.hbase.index.ValueGetter;
@@ -92,7 +93,6 @@ import org.apache.phoenix.util.ScanUtil;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import com.sun.istack.NotNull;
 
 public class DeleteCompiler {
@@ -122,14 +122,14 @@ public class DeleteCompiler {
         final int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
         final int maxSizeBytes = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE_BYTES);
         final int batchSize = Math.min(connection.getMutateBatchSize(), maxSize);
-        Map<ImmutableBytesPtr,RowMutationState> mutations = Maps.newHashMapWithExpectedSize(batchSize);
-        List<Map<ImmutableBytesPtr,RowMutationState>> indexMutations = null;
+        MultiRowMutationState mutations = new MultiRowMutationState(batchSize);
+        List<MultiRowMutationState> indexMutations = null;
         // If indexTableRef is set, we're deleting the rows from both the index table and
         // the data table through a single query to save executing an additional one.
         if (!otherTableRefs.isEmpty()) {
             indexMutations = Lists.newArrayListWithExpectedSize(otherTableRefs.size());
             for (int i = 0; i < otherTableRefs.size(); i++) {
-                indexMutations.add(Maps.<ImmutableBytesPtr,RowMutationState>newHashMapWithExpectedSize(batchSize));
+                indexMutations.add(new MultiRowMutationState(batchSize));
             }
         }
         List<PColumn> pkColumns = table.getPKColumns();
@@ -208,7 +208,7 @@ public class DeleteCompiler {
                 // row key will already have its value.
                 // Check for otherTableRefs being empty required when deleting directly from the index
                 if (otherTableRefs.isEmpty() || table.getIndexType() != IndexType.LOCAL) {
-                    mutations.put(rowKeyPtr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null));
+                    mutations.put(rowKeyPtr, new RowMutationState(PRow.DELETE_MARKER, 0, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null));
                 }
                 for (int i = 0; i < otherTableRefs.size(); i++) {
                     PTable otherTable = otherTableRefs.get(i).getTable();
@@ -222,7 +222,7 @@ public class DeleteCompiler {
                     } else {
                         indexPtr.set(maintainers[i].buildRowKey(getter, rowKeyPtr, null, null, HConstants.LATEST_TIMESTAMP));
                     }
-                    indexMutations.get(i).put(indexPtr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null));
+                    indexMutations.get(i).put(indexPtr, new RowMutationState(PRow.DELETE_MARKER, 0, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null));
                 }
                 if (mutations.size() > maxSize) {
                     throw new IllegalArgumentException("MutationState size of " + mutations.size() + " is bigger than max allowed size of " + maxSize);
@@ -239,7 +239,7 @@ public class DeleteCompiler {
                     connection.getMutationState().send();
                     mutations.clear();
                     if (indexMutations != null) {
-                        for (Map<ImmutableBytesPtr, RowMutationState> multiRowMutationState : indexMutations) {
+                        for (MultiRowMutationState multiRowMutationState : indexMutations) {
                             multiRowMutationState.clear();
                         }
                     }
@@ -651,10 +651,10 @@ public class DeleteCompiler {
             // keys for our ranges
             ScanRanges ranges = context.getScanRanges();
             Iterator<KeyRange> iterator = ranges.getPointLookupKeyIterator();
-            Map<ImmutableBytesPtr,RowMutationState> mutation = Maps.newHashMapWithExpectedSize(ranges.getPointLookupCount());
+            MultiRowMutationState mutation = new MultiRowMutationState(ranges.getPointLookupCount());
             while (iterator.hasNext()) {
                 mutation.put(new ImmutableBytesPtr(iterator.next().getLowerRange()),
-                        new RowMutationState(PRow.DELETE_MARKER,
+                        new RowMutationState(PRow.DELETE_MARKER, 0,
                                 statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null));
             }
             return new MutationState(dataPlan.getTableRef(), mutation, 0, maxSize, maxSizeBytes, connection);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aeb33b9f/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 7e83ad5..d827cbe 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -47,6 +47,7 @@ import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.AggregatePlan;
 import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.execute.MutationState.MultiRowMutationState;
 import org.apache.phoenix.execute.MutationState.RowMutationState;
 import org.apache.phoenix.execute.MutationState.RowTimestampColInfo;
 import org.apache.phoenix.expression.Determinism;
@@ -116,9 +117,10 @@ import com.google.common.collect.Sets;
 public class UpsertCompiler {
 
     private static void setValues(byte[][] values, int[] pkSlotIndex, int[] columnIndexes,
-            PTable table, Map<ImmutableBytesPtr, RowMutationState> mutation,
+            PTable table, MultiRowMutationState mutation,
             PhoenixStatement statement, boolean useServerTimestamp, IndexMaintainer maintainer,
             byte[][] viewConstants, byte[] onDupKeyBytes, int numSplColumns) throws SQLException {
+        long columnValueSize = 0;
         Map<PColumn,byte[]> columnValues = Maps.newHashMapWithExpectedSize(columnIndexes.length);
         byte[][] pkValues = new byte[table.getPKColumns().size()][];
         // If the table uses salting, the first byte is the salting byte, set to an empty array
@@ -148,6 +150,7 @@ public class UpsertCompiler {
                 }
             } else {
                 columnValues.put(column, value);
+                columnValueSize += (column.getEstimatedSize() + value.length);
             }
         }
         ImmutableBytesPtr ptr = new ImmutableBytesPtr();
@@ -166,7 +169,7 @@ public class UpsertCompiler {
                     regionPrefix.length));
             }
         } 
-        mutation.put(ptr, new RowMutationState(columnValues, statement.getConnection().getStatementExecutionCounter(), rowTsColInfo, onDupKeyBytes));
+        mutation.put(ptr, new RowMutationState(columnValues, columnValueSize, statement.getConnection().getStatementExecutionCounter(), rowTsColInfo, onDupKeyBytes));
     }
     
     public static MutationState upsertSelect(StatementContext childContext, TableRef tableRef, RowProjector projector,
@@ -195,7 +198,7 @@ public class UpsertCompiler {
             }
         }
         int rowCount = 0;
-        Map<ImmutableBytesPtr, RowMutationState> mutation = Maps.newHashMapWithExpectedSize(batchSize);
+        MultiRowMutationState mutation = new MultiRowMutationState(batchSize);
         PTable table = tableRef.getTable();
         IndexMaintainer indexMaintainer = null;
         byte[][] viewConstants = null;
@@ -1180,7 +1183,7 @@ public class UpsertCompiler {
                     throw new IllegalStateException();
                 }
             }
-            Map<ImmutableBytesPtr, RowMutationState> mutation = Maps.newHashMapWithExpectedSize(1);
+            MultiRowMutationState mutation = new MultiRowMutationState(1);
             IndexMaintainer indexMaintainer = null;
             byte[][] viewConstants = null;
             if (table.getIndexType() == IndexType.LOCAL) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aeb33b9f/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index e9547f2..510e609 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -97,6 +97,7 @@ import org.apache.phoenix.util.SQLCloseable;
 import org.apache.phoenix.util.SQLCloseables;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
+import org.apache.phoenix.util.SizedUtil;
 import org.apache.phoenix.util.TransactionUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -123,7 +124,7 @@ public class MutationState implements SQLCloseable {
     private final long batchSize;
     private final long batchSizeBytes;
     private long batchCount = 0L;
-    private final Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> mutations;
+    private final Map<TableRef, MultiRowMutationState> mutations;
     private final Set<String> uncommittedPhysicalNames = Sets.newHashSetWithExpectedSize(10);
 
     private long sizeOffset;
@@ -131,7 +132,7 @@ public class MutationState implements SQLCloseable {
     private long estimatedSize = 0;
     private int[] uncommittedStatementIndexes = EMPTY_STATEMENT_INDEX_ARRAY;
     private boolean isExternalTxContext = false;
-    private Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> txMutations = Collections.emptyMap();
+    private Map<TableRef, MultiRowMutationState> txMutations = Collections.emptyMap();
 
     final PhoenixTransactionContext phoenixTransactionContext;
 
@@ -159,12 +160,12 @@ public class MutationState implements SQLCloseable {
     }
 
     private MutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection, boolean subTask, PhoenixTransactionContext txContext, long sizeOffset) {
-        this(maxSize, maxSizeBytes, connection, Maps.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>newHashMapWithExpectedSize(5), subTask, txContext);
+        this(maxSize, maxSizeBytes, connection, Maps.<TableRef, MultiRowMutationState>newHashMapWithExpectedSize(5), subTask, txContext);
         this.sizeOffset = sizeOffset;
     }
 
     MutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection,
-            Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> mutations,
+            Map<TableRef, MultiRowMutationState> mutations,
             boolean subTask, PhoenixTransactionContext txContext) {
         this.maxSize = maxSize;
         this.maxSizeBytes = maxSizeBytes;
@@ -189,15 +190,19 @@ public class MutationState implements SQLCloseable {
         }
     }
 
-    public MutationState(TableRef table, Map<ImmutableBytesPtr,RowMutationState> mutations, long sizeOffset, long maxSize, long maxSizeBytes, PhoenixConnection connection) throws SQLException {
+    public MutationState(TableRef table, MultiRowMutationState mutations, long sizeOffset, long maxSize, long maxSizeBytes, PhoenixConnection connection)  throws SQLException {
         this(maxSize, maxSizeBytes, connection, false, null, sizeOffset);
         if (!mutations.isEmpty()) {
             this.mutations.put(table, mutations);
         }
         this.numRows = mutations.size();
-        this.estimatedSize = PhoenixKeyValueUtil.getEstimatedRowSize(table, mutations);
+        this.estimatedSize = PhoenixKeyValueUtil.getEstimatedRowMutationSize(this.mutations);
         throwIfTooBig();
     }
+    
+    public long getEstimatedSize() {
+        return estimatedSize;
+    }
 
     public long getMaxSize() {
         return maxSize;
@@ -346,7 +351,7 @@ public class MutationState implements SQLCloseable {
     }
 
     public static MutationState emptyMutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection) {
-        MutationState state = new MutationState(maxSize, maxSizeBytes, connection, Collections.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>emptyMap(), false, null);
+        MutationState state = new MutationState(maxSize, maxSizeBytes, connection, Collections.<TableRef, MultiRowMutationState>emptyMap(), false, null);
         state.sizeOffset = 0;
         return state;
     }
@@ -368,12 +373,12 @@ public class MutationState implements SQLCloseable {
         return sizeOffset + numRows;
     }
     
-    private void joinMutationState(TableRef tableRef, Map<ImmutableBytesPtr,RowMutationState> srcRows,
-            Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> dstMutations) {
+    private void joinMutationState(TableRef tableRef, MultiRowMutationState srcRows,
+            Map<TableRef, MultiRowMutationState> dstMutations) {
         PTable table = tableRef.getTable();
         boolean isIndex = table.getType() == PTableType.INDEX;
         boolean incrementRowCount = dstMutations == this.mutations;
-        Map<ImmutableBytesPtr,RowMutationState> existingRows = dstMutations.put(tableRef, srcRows);
+        MultiRowMutationState existingRows = dstMutations.put(tableRef, srcRows);
         if (existingRows != null) { // Rows for that table already exist
             // Loop through new rows and replace existing with new
             for (Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry : srcRows.entrySet()) {
@@ -385,8 +390,12 @@ public class MutationState implements SQLCloseable {
                         Map<PColumn,byte[]> newRow = rowEntry.getValue().getColumnValues();
                         // if new row is PRow.DELETE_MARKER, it means delete, and we don't need to merge it with existing row. 
                         if (newRow != PRow.DELETE_MARKER) {
+                            // decrement estimated size by the size of the old row 
+                            estimatedSize-=existingRowMutationState.calculateEstimatedSize();
                             // Merge existing column values with new column values
                             existingRowMutationState.join(rowEntry.getValue());
+                            // increment estimated size by the size of the new row
+                            estimatedSize+=existingRowMutationState.calculateEstimatedSize();
                             // Now that the existing row has been merged with the new row, replace it back
                             // again (since it was merged with the new one above).
                             existingRows.put(rowEntry.getKey(), existingRowMutationState);
@@ -395,6 +404,8 @@ public class MutationState implements SQLCloseable {
                 } else {
                     if (incrementRowCount && !isIndex) { // Don't count index rows in row count
                         numRows++;
+                        // increment estimated size by the size of the new row
+                        estimatedSize += rowEntry.getValue().calculateEstimatedSize();
                     }
                 }
             }
@@ -402,22 +413,25 @@ public class MutationState implements SQLCloseable {
             dstMutations.put(tableRef, existingRows);
         } else {
             // Size new map at batch size as that's what it'll likely grow to.
-            Map<ImmutableBytesPtr,RowMutationState> newRows = Maps.newHashMapWithExpectedSize(connection.getMutateBatchSize());
+            MultiRowMutationState newRows = new MultiRowMutationState(connection.getMutateBatchSize());
             newRows.putAll(srcRows);
             dstMutations.put(tableRef, newRows);
             if (incrementRowCount && !isIndex) {
                 numRows += srcRows.size();
+                // if we added all the rows from newMutationState we can just increment the
+                // estimatedSize by newMutationState.estimatedSize
+                estimatedSize +=  srcRows.estimatedSize;
             }
         }
     }
     
-    private void joinMutationState(Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> srcMutations, 
-            Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> dstMutations) {
+    private void joinMutationState(Map<TableRef, MultiRowMutationState> srcMutations, 
+            Map<TableRef, MultiRowMutationState> dstMutations) {
         // Merge newMutation with this one, keeping state from newMutation for any overlaps
-        for (Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry : srcMutations.entrySet()) {
+        for (Map.Entry<TableRef, MultiRowMutationState> entry : srcMutations.entrySet()) {
             // Replace existing entries for the table with new entries
             TableRef tableRef = entry.getKey();
-            Map<ImmutableBytesPtr,RowMutationState> srcRows = entry.getValue();
+            MultiRowMutationState srcRows = entry.getValue();
             joinMutationState(tableRef, srcRows, dstMutations);
         }
     }
@@ -435,12 +449,7 @@ public class MutationState implements SQLCloseable {
         phoenixTransactionContext.join(newMutationState.getPhoenixTransactionContext());
 
         this.sizeOffset += newMutationState.sizeOffset;
-        int oldNumRows = this.numRows;
         joinMutationState(newMutationState.mutations, this.mutations);
-        // here we increment the estimated size by the fraction of new rows we added from the newMutationState 
-        if (newMutationState.numRows>0) {
-            this.estimatedSize += ((double)(this.numRows-oldNumRows)/newMutationState.numRows) * newMutationState.estimatedSize;
-        }
         if (!newMutationState.txMutations.isEmpty()) {
             if (txMutations.isEmpty()) {
                 txMutations = Maps.newHashMapWithExpectedSize(mutations.size());
@@ -478,7 +487,7 @@ public class MutationState implements SQLCloseable {
         return ptr;
     }
     
-    private Iterator<Pair<PName,List<Mutation>>> addRowMutations(final TableRef tableRef, final Map<ImmutableBytesPtr, RowMutationState> values,
+    private Iterator<Pair<PName,List<Mutation>>> addRowMutations(final TableRef tableRef, final MultiRowMutationState values,
             final long mutationTimestamp, final long serverTimestamp, boolean includeAllIndexes, final boolean sendAll) { 
         final PTable table = tableRef.getTable();
         final Iterator<PTable> indexes = // Only maintain tables with immutable rows through this client-side mechanism
@@ -513,10 +522,10 @@ public class MutationState implements SQLCloseable {
                     // we may also have to include delete mutations for immutable tables if we are not processing all the tables in the mutations map
                     if (!sendAll) {
                         TableRef key = new TableRef(index);
-                        Map<ImmutableBytesPtr, RowMutationState> rowToColumnMap = mutations.remove(key);
-                        if (rowToColumnMap!=null) {
+                        MultiRowMutationState multiRowMutationState = mutations.remove(key);
+                        if (multiRowMutationState!=null) {
                             final List<Mutation> deleteMutations = Lists.newArrayList();
-                            generateMutations(tableRef, mutationTimestamp, serverTimestamp, rowToColumnMap, deleteMutations, null);
+                            generateMutations(tableRef, mutationTimestamp, serverTimestamp, multiRowMutationState, deleteMutations, null);
                             indexMutations.addAll(deleteMutations);
                         }
                     }
@@ -535,14 +544,14 @@ public class MutationState implements SQLCloseable {
     }
 
     private void generateMutations(final TableRef tableRef, final long mutationTimestamp,
-            final long serverTimestamp, final Map<ImmutableBytesPtr, RowMutationState> values,
+            final long serverTimestamp, final MultiRowMutationState values,
             final List<Mutation> mutationList, final List<Mutation> mutationsPertainingToIndex) {
         final PTable table = tableRef.getTable();
         boolean tableWithRowTimestampCol = table.getRowTimestampColPos() != -1;
         Iterator<Map.Entry<ImmutableBytesPtr, RowMutationState>> iterator =
                 values.entrySet().iterator();
         long timestampToUse = mutationTimestamp;
-        Map<ImmutableBytesPtr, RowMutationState> modifiedValues = Maps.newHashMap();
+        MultiRowMutationState modifiedValues = new MultiRowMutationState(16);
         while (iterator.hasNext()) {
             Map.Entry<ImmutableBytesPtr, RowMutationState> rowEntry = iterator.next();
             byte[] onDupKeyBytes = rowEntry.getValue().getOnDupKeyBytes();
@@ -617,7 +626,7 @@ public class MutationState implements SQLCloseable {
     }
     
     public Iterator<Pair<byte[],List<Mutation>>> toMutations(final boolean includeMutableIndexes, final Long tableTimestamp) {
-        final Iterator<Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>>> iterator = this.mutations.entrySet().iterator();
+        final Iterator<Map.Entry<TableRef, MultiRowMutationState>> iterator = this.mutations.entrySet().iterator();
         if (!iterator.hasNext()) {
             return Collections.emptyIterator();
         }
@@ -625,7 +634,7 @@ public class MutationState implements SQLCloseable {
         final long serverTimestamp = getTableTimestamp(tableTimestamp, scn);
         final long mutationTimestamp = getMutationTimestamp(scn);
         return new Iterator<Pair<byte[],List<Mutation>>>() {
-            private Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> current = iterator.next();
+            private Map.Entry<TableRef, MultiRowMutationState> current = iterator.next();
             private Iterator<Pair<byte[],List<Mutation>>> innerIterator = init();
                     
             private Iterator<Pair<byte[],List<Mutation>>> init() {
@@ -689,14 +698,14 @@ public class MutationState implements SQLCloseable {
     private long[] validateAll() throws SQLException {
         int i = 0;
         long[] timeStamps = new long[this.mutations.size()];
-        for (Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry : mutations.entrySet()) {
+        for (Map.Entry<TableRef, MultiRowMutationState> entry : mutations.entrySet()) {
             TableRef tableRef = entry.getKey();
             timeStamps[i++] = validateAndGetServerTimestamp(tableRef, entry.getValue());
         }
         return timeStamps;
     }
     
-    private long validateAndGetServerTimestamp(TableRef tableRef, Map<ImmutableBytesPtr, RowMutationState> rowKeyToColumnMap) throws SQLException {
+    private long validateAndGetServerTimestamp(TableRef tableRef, MultiRowMutationState rowKeyToColumnMap) throws SQLException {
         Long scn = connection.getSCN();
         MetaDataClient client = new MetaDataClient(connection);
         long serverTimeStamp = tableRef.getTimeStamp();
@@ -907,7 +916,7 @@ public class MutationState implements SQLCloseable {
             sendAll = true;
         }
 
-        Map<ImmutableBytesPtr, RowMutationState> valuesMap;
+        MultiRowMutationState multiRowMutationState;
         Map<TableInfo,List<Mutation>> physicalTableMutationMap = Maps.newLinkedHashMap(); 
         // add tracing for this operation
         try (TraceScope trace = Tracing.startNewSpan(connection, "Committing mutations to tables")) {
@@ -916,16 +925,16 @@ public class MutationState implements SQLCloseable {
             while (tableRefIterator.hasNext()) {
                 // at this point we are going through mutations for each table
                 final TableRef tableRef = tableRefIterator.next();
-                valuesMap = mutations.get(tableRef);
-                if (valuesMap == null || valuesMap.isEmpty()) {
+                multiRowMutationState = mutations.get(tableRef);
+                if (multiRowMutationState == null || multiRowMutationState.isEmpty()) {
                     continue;
                 }
                 // Validate as we go if transactional since we can undo if a problem occurs (which is unlikely)
-                long serverTimestamp = serverTimeStamps == null ? validateAndGetServerTimestamp(tableRef, valuesMap) : serverTimeStamps[i++];
+                long serverTimestamp = serverTimeStamps == null ? validateAndGetServerTimestamp(tableRef, multiRowMutationState) : serverTimeStamps[i++];
                 Long scn = connection.getSCN();
                 long mutationTimestamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn;
                 final PTable table = tableRef.getTable();
-                Iterator<Pair<PName,List<Mutation>>> mutationsIterator = addRowMutations(tableRef, valuesMap, mutationTimestamp, serverTimestamp, false, sendAll);
+                Iterator<Pair<PName,List<Mutation>>> mutationsIterator = addRowMutations(tableRef, multiRowMutationState, mutationTimestamp, serverTimestamp, false, sendAll);
                 // build map from physical table to mutation list
                 boolean isDataTable = true;
                 while (mutationsIterator.hasNext()) {
@@ -943,7 +952,7 @@ public class MutationState implements SQLCloseable {
                 // involved in the transaction since none of them would have been
                 // committed in the event of a failure.
                 if (table.isTransactional()) {
-                    addUncommittedStatementIndexes(valuesMap.values());
+                    addUncommittedStatementIndexes(multiRowMutationState.values());
                     if (txMutations.isEmpty()) {
                         txMutations = Maps.newHashMapWithExpectedSize(mutations.size());
                     }
@@ -952,7 +961,7 @@ public class MutationState implements SQLCloseable {
                     // in the event that we need to replay the commit.
                     // Copy TableRef so we have the original PTable and know when the
                     // indexes have changed.
-                    joinMutationState(new TableRef(tableRef), valuesMap, txMutations);
+                    joinMutationState(new TableRef(tableRef), multiRowMutationState, txMutations);
                 }
             }
             long serverTimestamp = HConstants.LATEST_TIMESTAMP;
@@ -974,8 +983,6 @@ public class MutationState implements SQLCloseable {
                 long mutationCommitTime = 0;
                 long numFailedMutations = 0;;
                 long startTime = 0;
-                long startNumRows = numRows;
-                long startEstimatedSize = estimatedSize;
                 do {
                     TableRef origTableRef = tableInfo.getOrigTableRef();
                     PTable table = origTableRef.getTable();
@@ -1022,13 +1029,13 @@ public class MutationState implements SQLCloseable {
                         GLOBAL_MUTATION_COMMIT_TIME.update(mutationCommitTime);
                         numFailedMutations = 0;
                         
+                        // Remove batches as we process them
+                        mutations.remove(origTableRef);
                         if (tableInfo.isDataTable()) {
                             numRows -= numMutations;
-                            // decrement estimated size by the fraction of rows we sent to hbase
-                            estimatedSize -= ((double)numMutations/startNumRows)*startEstimatedSize;
+                            // recalculate the estimated size
+                            estimatedSize = PhoenixKeyValueUtil.getEstimatedRowMutationSize(mutations);
                         }
-                        // Remove batches as we process them
-                        mutations.remove(origTableRef);
                     } catch (Exception e) {
                     	mutationCommitTime = System.currentTimeMillis() - startTime;
                         serverTimestamp = ServerUtil.parseServerTimestamp(e);
@@ -1179,7 +1186,7 @@ public class MutationState implements SQLCloseable {
     }
     
     private int[] getUncommittedStatementIndexes() {
-        for (Map<ImmutableBytesPtr, RowMutationState> rowMutationMap : mutations.values()) {
+        for (MultiRowMutationState rowMutationMap : mutations.values()) {
             addUncommittedStatementIndexes(rowMutationMap.values());
         }
         return uncommittedStatementIndexes;
@@ -1212,7 +1219,7 @@ public class MutationState implements SQLCloseable {
     }
 
     public void commit() throws SQLException {
-        Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> txMutations = Collections.emptyMap();
+        Map<TableRef, MultiRowMutationState> txMutations = Collections.emptyMap();
         int retryCount = 0;
         do {
             boolean sendSuccessful=false;
@@ -1422,13 +1429,54 @@ public class MutationState implements SQLCloseable {
         }
     }
     
+    public static class MultiRowMutationState {
+        private Map<ImmutableBytesPtr,RowMutationState> rowKeyToRowMutationState;
+        private long estimatedSize;
+        
+        public MultiRowMutationState(int size) {
+            this.rowKeyToRowMutationState = Maps.newHashMapWithExpectedSize(size);
+            this.estimatedSize = 0;
+        }
+        
+        public RowMutationState put(ImmutableBytesPtr ptr, RowMutationState rowMutationState) { 
+            estimatedSize += rowMutationState.calculateEstimatedSize();
+            return rowKeyToRowMutationState.put(ptr, rowMutationState);
+        }
+        
+        public void putAll(MultiRowMutationState other) {
+            estimatedSize += other.estimatedSize;
+            rowKeyToRowMutationState.putAll(other.rowKeyToRowMutationState);
+        }
+        
+        public boolean isEmpty() {
+            return rowKeyToRowMutationState.isEmpty();
+        }
+        
+        public int size() {
+            return rowKeyToRowMutationState.size();
+        }
+        
+        public Set<Entry<ImmutableBytesPtr, RowMutationState>> entrySet() {
+            return rowKeyToRowMutationState.entrySet();
+        }
+        
+        public void clear(){
+            rowKeyToRowMutationState.clear();
+        }
+        
+        public Collection<RowMutationState> values() {
+            return rowKeyToRowMutationState.values();
+        }
+    }
+    
     public static class RowMutationState {
         @Nonnull private Map<PColumn,byte[]> columnValues;
         private int[] statementIndexes;
         @Nonnull private final RowTimestampColInfo rowTsColInfo;
         private byte[] onDupKeyBytes;
+        private long colValuesSize;
         
-        public RowMutationState(@Nonnull Map<PColumn,byte[]> columnValues, int statementIndex, @Nonnull RowTimestampColInfo rowTsColInfo,
+        public RowMutationState(@Nonnull Map<PColumn,byte[]> columnValues, long colValuesSize, int statementIndex, @Nonnull RowTimestampColInfo rowTsColInfo,
                 byte[] onDupKeyBytes) {
             checkNotNull(columnValues);
             checkNotNull(rowTsColInfo);
@@ -1436,6 +1484,12 @@ public class MutationState implements SQLCloseable {
             this.statementIndexes = new int[] {statementIndex};
             this.rowTsColInfo = rowTsColInfo;
             this.onDupKeyBytes = onDupKeyBytes;
+            this.colValuesSize = colValuesSize;
+        }
+
+        public long calculateEstimatedSize() {
+            return colValuesSize + statementIndexes.length * SizedUtil.INT_SIZE + SizedUtil.LONG_SIZE
+                    + (onDupKeyBytes != null ? onDupKeyBytes.length : 0);
         }
 
         byte[] getOnDupKeyBytes() {
@@ -1454,7 +1508,16 @@ public class MutationState implements SQLCloseable {
             // If we already have a row and the new row has an ON DUPLICATE KEY clause
             // ignore the new values (as that's what the server will do).
             if (newRow.onDupKeyBytes == null) {
-                getColumnValues().putAll(newRow.getColumnValues());
+                // increment the column value size by the new row column value size
+                colValuesSize+=newRow.colValuesSize;
+                for (Map.Entry<PColumn,byte[]> entry : newRow.columnValues.entrySet()) {
+                    PColumn col = entry.getKey();
+                    byte[] oldValue = columnValues.put(col, entry.getValue());
+                    if (oldValue!=null) {
+                        // decrement column value size by the size of all column values that were replaced
+                        colValuesSize-=(col.getEstimatedSize() + oldValue.length);
+                    }
+                }
             }
             // Concatenate ON DUPLICATE KEY bytes to allow multiple
             // increments of the same row in the same commit batch.
@@ -1466,7 +1529,7 @@ public class MutationState implements SQLCloseable {
         RowTimestampColInfo getRowTimestampColInfo() {
             return rowTsColInfo;
         }
-       
+
     }
     
     public ReadMetricQueue getReadMetricQueue() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aeb33b9f/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
index 5e97ce6..c6cbe3e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
@@ -73,7 +73,7 @@ import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataService;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.UpdateIndexStateRequest;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
-import org.apache.phoenix.execute.MutationState.RowMutationState;
+import org.apache.phoenix.execute.MutationState.MultiRowMutationState;
 import org.apache.phoenix.execute.TupleProjector;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.KeyValueColumnExpression;
@@ -296,7 +296,7 @@ public class IndexUtil {
     }
     
     public static List<Mutation> generateIndexData(final PTable table, PTable index,
-            final Map<ImmutableBytesPtr, RowMutationState> valuesMap, List<Mutation> dataMutations, final KeyValueBuilder kvBuilder, PhoenixConnection connection)
+            final MultiRowMutationState multiRowMutationState, List<Mutation> dataMutations, final KeyValueBuilder kvBuilder, PhoenixConnection connection)
             throws SQLException {
         try {
         	final ImmutableBytesPtr ptr = new ImmutableBytesPtr();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aeb33b9f/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixKeyValueUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixKeyValueUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixKeyValueUtil.java
index 84525fd..ce5cb55 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixKeyValueUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixKeyValueUtil.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.execute.MutationState.MultiRowMutationState;
 import org.apache.phoenix.execute.MutationState.RowMutationState;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
@@ -188,42 +189,15 @@ public class PhoenixKeyValueUtil {
      * @param mutations map from table to row to RowMutationState
      * @return estimated row size
      */
-    public static long
-            getEstimatedRowSize(TableRef tableRef, Map<ImmutableBytesPtr, RowMutationState> mutations) {
+    public static long getEstimatedRowMutationSize(
+            Map<TableRef, MultiRowMutationState> tableMutationMap) {
         long size = 0;
-        PTable table = tableRef.getTable();
-        // iterate over rows
-        for (Entry<ImmutableBytesPtr, RowMutationState> rowEntry : mutations.entrySet()) {
-            int rowLength = rowEntry.getKey().getLength();
-            Map<PColumn, byte[]> colValueMap = rowEntry.getValue().getColumnValues();
-            switch (table.getImmutableStorageScheme()) {
-            case ONE_CELL_PER_COLUMN:
-                // iterate over columns
-                for (Entry<PColumn, byte[]> colValueEntry : colValueMap.entrySet()) {
-                    PColumn pColumn = colValueEntry.getKey();
-                    size +=
-                            KeyValue.getKeyValueDataStructureSize(rowLength,
-                                pColumn.getFamilyName().getBytes().length,
-                                pColumn.getColumnQualifierBytes().length,
-                                colValueEntry.getValue().length);
-                }
-                break;
-            case SINGLE_CELL_ARRAY_WITH_OFFSETS:
-                // we store all the column values in a single key value that contains all the
-                // column values followed by an offset array
-                size +=
-                        PArrayDataTypeEncoder.getEstimatedByteSize(table, rowLength,
-                            colValueMap);
-                break;
+        // iterate over table
+        for (Entry<TableRef, MultiRowMutationState> tableEntry : tableMutationMap.entrySet()) {
+            // iterate over rows
+            for (Entry<ImmutableBytesPtr, RowMutationState> rowEntry : tableEntry.getValue().entrySet()) {
+                size += calculateRowMutationSize(rowEntry);
             }
-            // count the empty key value
-            Pair<byte[], byte[]> emptyKeyValueInfo =
-                    EncodedColumnsUtil.getEmptyKeyValueInfo(table);
-            size +=
-                    KeyValue.getKeyValueDataStructureSize(rowLength,
-                        SchemaUtil.getEmptyColumnFamilyPtr(table).getLength(),
-                        emptyKeyValueInfo.getFirst().length,
-                        emptyKeyValueInfo.getSecond().length);
         }
         return size;
     }
@@ -237,4 +211,10 @@ public class PhoenixKeyValueUtil {
         }
         return KeyValueUtil.copyToNewKeyValue(c);
     }
+
+    private static long calculateRowMutationSize(Entry<ImmutableBytesPtr, RowMutationState> rowEntry) {
+        int rowLength = rowEntry.getKey().getLength();
+        long colValuesLength = rowEntry.getValue().calculateEstimatedSize();
+        return (rowLength + colValuesLength);
+    }
 }