You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2017/01/27 01:19:08 UTC

[24/26] phoenix git commit: PHOENIX-541 Make mutable batch size bytes-based instead of row-based (Geoffrey Jacoby)

PHOENIX-541 Make mutable batch size bytes-based instead of row-based (Geoffrey Jacoby)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/a44d317e
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/a44d317e
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/a44d317e

Branch: refs/heads/calcite
Commit: a44d317e32d383c4964824448bf12dfb8ed8bfec
Parents: b9323e1
Author: Samarth <sa...@salesforce.com>
Authored: Thu Jan 26 15:31:43 2017 -0800
Committer: Samarth <sa...@salesforce.com>
Committed: Thu Jan 26 15:31:43 2017 -0800

----------------------------------------------------------------------
 .../org/apache/phoenix/end2end/QueryMoreIT.java | 35 ++++++++++++++
 .../end2end/UpsertSelectAutoCommitIT.java       |  2 +-
 .../apache/phoenix/end2end/UpsertSelectIT.java  |  2 +-
 .../phoenix/end2end/index/ImmutableIndexIT.java |  1 -
 .../org/apache/phoenix/tx/TransactionIT.java    |  2 +-
 .../org/apache/phoenix/tx/TxCheckpointIT.java   |  2 +-
 .../UngroupedAggregateRegionObserver.java       | 31 +++++++-----
 .../apache/phoenix/execute/MutationState.java   | 51 ++++++++++++++++++--
 .../apache/phoenix/jdbc/PhoenixConnection.java  | 10 +++-
 .../index/PhoenixIndexImportDirectMapper.java   | 19 +++++---
 .../org/apache/phoenix/query/QueryServices.java |  3 ++
 .../phoenix/query/QueryServicesOptions.java     |  7 ++-
 .../java/org/apache/phoenix/util/JDBCUtil.java  |  6 +++
 .../org/apache/phoenix/util/PhoenixRuntime.java | 12 ++++-
 .../apache/phoenix/jdbc/PhoenixDriverTest.java  |  9 ++++
 .../org/apache/phoenix/util/JDBCUtilTest.java   | 21 +++++++-
 16 files changed, 181 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/a44d317e/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java
index 2b27f00..a2dab16 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java
@@ -37,6 +37,8 @@ import java.util.Properties;
 
 import org.apache.hadoop.hbase.util.Base64;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.TestUtil;
 import org.junit.Test;
@@ -471,4 +473,37 @@ public class QueryMoreIT extends ParallelStatsDisabledIT {
             assertFalse(rs.next());
         }
     }
+
+    @Test
+    public void testMutationBatch() throws Exception {
+        Properties connectionProperties = new Properties();
+        connectionProperties.setProperty(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB, "1024");
+        PhoenixConnection connection = (PhoenixConnection) DriverManager.getConnection(getUrl(), connectionProperties);
+        String fullTableName = generateUniqueName();
+        try (Statement stmt = connection.createStatement()) {
+            stmt.execute("CREATE TABLE " + fullTableName + "(\n" +
+                "    ORGANIZATION_ID CHAR(15) NOT NULL,\n" +
+                "    SCORE DOUBLE NOT NULL,\n" +
+                "    ENTITY_ID CHAR(15) NOT NULL\n" +
+                "    CONSTRAINT PAGE_SNAPSHOT_PK PRIMARY KEY (\n" +
+                "        ORGANIZATION_ID,\n" +
+                "        SCORE DESC,\n" +
+                "        ENTITY_ID DESC\n" +
+                "    )\n" +
+                ") MULTI_TENANT=TRUE");
+        }
+        PreparedStatement stmt = connection.prepareStatement("upsert into " + fullTableName +
+            " (organization_id, entity_id, score) values (?,?,?)");
+        try {
+            for (int i = 0; i < 4; i++) {
+                stmt.setString(1, "AAAA" + i);
+                stmt.setString(2, "BBBB" + i);
+                stmt.setInt(3, 1);
+                stmt.execute();
+            }
+            connection.commit();
+        } catch (IllegalArgumentException expected) {}
+
+        assertEquals(2L, connection.getMutationState().getBatchCount());
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a44d317e/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectAutoCommitIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectAutoCommitIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectAutoCommitIT.java
index 37482de..6b781a0 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectAutoCommitIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectAutoCommitIT.java
@@ -152,7 +152,7 @@ public class UpsertSelectAutoCommitIT extends ParallelStatsDisabledIT {
     @Test
     public void testUpsertSelectDoesntSeeUpsertedData() throws Exception {
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        props.setProperty(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, Integer.toString(3));
+        props.setProperty(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB, Integer.toString(512));
         props.setProperty(QueryServices.SCAN_CACHE_SIZE_ATTRIB, Integer.toString(3));
         props.setProperty(QueryServices.SCAN_RESULT_CHUNK_SIZE, Integer.toString(3));
         Connection conn = DriverManager.getConnection(getUrl(), props);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a44d317e/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
index 763f11b..f5905ee 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
@@ -1435,7 +1435,7 @@ public class UpsertSelectIT extends BaseClientManagedTimeIT {
         long ts = nextTimestamp();
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts));
-        props.setProperty(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, Integer.toString(3));
+        props.setProperty(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB, Integer.toString(512));
         props.setProperty(QueryServices.SCAN_CACHE_SIZE_ATTRIB, Integer.toString(3));
         props.setProperty(QueryServices.SCAN_RESULT_CHUNK_SIZE, Integer.toString(3));
         Connection conn = DriverManager.getConnection(getUrl(), props);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a44d317e/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
index 3ee9721..bc301fa 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
@@ -154,7 +154,6 @@ public class ImmutableIndexIT extends BaseUniqueNamesOwnClusterIT {
             return;
         }
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        props.setProperty(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, Integer.toString(100));
         String tableName = "TBL_" + generateUniqueName();
         String indexName = "IND_" + generateUniqueName();
         String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a44d317e/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
index 83128f1..bde5cc8 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
@@ -603,7 +603,7 @@ public class TransactionIT extends ParallelStatsDisabledIT {
     @Test
     public void testParallelUpsertSelect() throws Exception {
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        props.setProperty(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, Integer.toString(3));
+        props.setProperty(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB, Integer.toString(512));
         props.setProperty(QueryServices.SCAN_CACHE_SIZE_ATTRIB, Integer.toString(3));
         props.setProperty(QueryServices.SCAN_RESULT_CHUNK_SIZE, Integer.toString(3));
         Connection conn = DriverManager.getConnection(getUrl(), props);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a44d317e/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java
index 14bcd70..246ecd4 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java
@@ -80,7 +80,7 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT {
         String seqName = "SEQ_" + generateUniqueName();
         String fullTableName = SchemaUtil.getTableName(tableName, tableName);
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        props.setProperty(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, Integer.toString(3));
+        props.setProperty(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB, Integer.toString(512));
         props.setProperty(QueryServices.SCAN_CACHE_SIZE_ATTRIB, Integer.toString(3));
         props.setProperty(QueryServices.SCAN_RESULT_CHUNK_SIZE, Integer.toString(3));
         Connection conn = getConnection(props);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a44d317e/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 9ee0054..a888bb2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -22,6 +22,7 @@ import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
 import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
 import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY;
 import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB;
 import static org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker.COMPACTION_UPDATE_STATS_ROW_COUNT;
 import static org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker.CONCURRENT_UPDATE_STATS_ROW_COUNT;
 
@@ -70,6 +71,7 @@ import org.apache.hadoop.io.WritableUtils;
 import org.apache.phoenix.cache.ServerCacheClient;
 import org.apache.phoenix.coprocessor.generated.PTableProtos;
 import org.apache.phoenix.exception.DataExceedsCapacityException;
+import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.execute.TupleProjector;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.ExpressionType;
@@ -343,6 +345,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         }
         
         int batchSize = 0;
+        long batchSizeBytes = 0L;
         List<Mutation> mutations = Collections.emptyList();
         boolean needToWrite = false;
         Configuration conf = c.getEnvironment().getConfiguration();
@@ -369,6 +372,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
             // TODO: size better
             mutations = Lists.newArrayListWithExpectedSize(1024);
             batchSize = env.getConfiguration().getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
+            batchSizeBytes = env.getConfiguration().getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB,
+                QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES);
         }
         Aggregators aggregators = ServerAggregators.deserialize(
                 scan.getAttribute(BaseScannerRegionObserver.AGGREGATORS), env.getConfiguration());
@@ -596,19 +601,23 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                                     mutations.add(put);
                                 }
                             }
-                            // Commit in batches based on UPSERT_BATCH_SIZE_ATTRIB in config
-                            if (!mutations.isEmpty() && batchSize > 0 &&
-                                    mutations.size() % batchSize == 0) {
-                                commitBatch(region, mutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr,
-                                        txState);
-                                mutations.clear();
+                            // Commit in batches based on UPSERT_BATCH_SIZE_BYTES_ATTRIB in config
+                            List<List<Mutation>> batchMutationList =
+                                MutationState.getMutationBatchList(batchSize, batchSizeBytes, mutations);
+                            for (List<Mutation> batchMutations : batchMutationList) {
+                                commitBatch(region, batchMutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr,
+                                    txState);
+                                batchMutations.clear();
                             }
-                            // Commit in batches based on UPSERT_BATCH_SIZE_ATTRIB in config
-                            if (!indexMutations.isEmpty() && batchSize > 0 &&
-                                    indexMutations.size() % batchSize == 0) {
-                                commitBatch(region, indexMutations, null, blockingMemStoreSize, null, txState);
-                                indexMutations.clear();
+                            mutations.clear();
+                            // Commit in batches based on UPSERT_BATCH_SIZE_BYTES_ATTRIB in config
+                            List<List<Mutation>> batchIndexMutationList =
+                                MutationState.getMutationBatchList(batchSize, batchSizeBytes, indexMutations);
+                            for (List<Mutation> batchIndexMutations : batchIndexMutationList) {
+                                commitBatch(region, batchIndexMutations, null, blockingMemStoreSize, null, txState);
+                                batchIndexMutations.clear();
                             }
+                            indexMutations.clear();
                         }
                         aggregators.aggregate(rowAggregators, result);
                         hasAny = true;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a44d317e/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 31ab7c9..4775d59 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -126,6 +126,8 @@ public class MutationState implements SQLCloseable {
     
     private final PhoenixConnection connection;
     private final long maxSize;
+    private final long maxSizeBytes;
+    private long batchCount = 0L;
     private final Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> mutations;
     private final List<TransactionAware> txAwares;
     private final TransactionContext txContext;
@@ -140,7 +142,7 @@ public class MutationState implements SQLCloseable {
     
     private final MutationMetricQueue mutationMetricQueue;
     private ReadMetricQueue readMetricQueue;
-    
+
     public MutationState(long maxSize, PhoenixConnection connection) {
         this(maxSize,connection, null, null);
     }
@@ -171,6 +173,7 @@ public class MutationState implements SQLCloseable {
             Transaction tx, TransactionContext txContext) {
         this.maxSize = maxSize;
         this.connection = connection;
+        this.maxSizeBytes = connection.getMutateBatchSizeBytes();
         this.mutations = mutations;
         boolean isMetricsEnabled = connection.isRequestLevelMetricsEnabled();
         this.mutationMetricQueue = isMetricsEnabled ? new MutationMetricQueue()
@@ -743,12 +746,11 @@ public class MutationState implements SQLCloseable {
     public static long getMutationTimestamp(final Long tableTimestamp, Long scn) {
         return (tableTimestamp!=null && tableTimestamp!=QueryConstants.UNSET_TIMESTAMP) ? tableTimestamp : (scn == null ? HConstants.LATEST_TIMESTAMP : scn);
     }
-        
+
     /**
      * Validates that the meta data is valid against the server meta data if we haven't yet done so.
      * Otherwise, for every UPSERT VALUES call, we'd need to hit the server to see if the meta data
      * has changed.
-     * @param connection
      * @return the server time to use for the upsert
      * @throws SQLException if the table or any columns no longer exist
      */
@@ -842,6 +844,15 @@ public class MutationState implements SQLCloseable {
             }
         }
     }
+
+    public long getMaxSizeBytes() {
+        return maxSizeBytes;
+    }
+
+    public long getBatchCount() {
+        return batchCount;
+    }
+
     private class MetaDataAwareHTable extends DelegateHTable {
         private final TableRef tableRef;
         
@@ -1068,7 +1079,11 @@ public class MutationState implements SQLCloseable {
                         
                         long startTime = System.currentTimeMillis();
                         child.addTimelineAnnotation("Attempt " + retryCount);
-                        hTable.batch(mutationList);
+                        List<List<Mutation>> mutationBatchList = getMutationBatchList(maxSize, maxSizeBytes, mutationList);
+                        for (List<Mutation> mutationBatch : mutationBatchList) {
+                            hTable.batch(mutationBatch);
+                            batchCount++;
+                        }
                         if (logger.isDebugEnabled()) logger.debug("Sent batch of " + numMutations + " for " + Bytes.toString(htableName));
                         child.stop();
                         child.stop();
@@ -1132,6 +1147,34 @@ public class MutationState implements SQLCloseable {
         }
     }
 
+    /**
+     * Split the list of mutations into multiple lists that don't exceed row and byte thresholds
+     * @param allMutationList List of HBase mutations
+     * @return List of lists of mutations
+     */
+    public static List<List<Mutation>> getMutationBatchList(long maxSize, long maxSizeBytes, List<Mutation> allMutationList) {
+        List<List<Mutation>> mutationBatchList = Lists.newArrayList();
+        List<Mutation> currentList = Lists.newArrayList();
+        long currentBatchSizeBytes = 0L;
+        for (Mutation mutation : allMutationList) {
+            long mutationSizeBytes = mutation.heapSize();
+            if (currentList.size() == maxSize || currentBatchSizeBytes + mutationSizeBytes > maxSizeBytes) {
+                if (currentList.size() > 0) {
+                    mutationBatchList.add(currentList);
+                    currentList = Lists.newArrayList();
+                    currentBatchSizeBytes = 0L;
+                }
+            }
+            currentList.add(mutation);
+            currentBatchSizeBytes += mutationSizeBytes;
+        }
+        if (currentList.size() > 0) {
+            mutationBatchList.add(currentList);
+        }
+        return mutationBatchList;
+
+    }
+
     public byte[] encodeTransaction() throws SQLException {
         try {
             return CODEC.encode(getTransaction());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a44d317e/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index c060164..cb2390e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -142,6 +142,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
     private final Properties info;
     private final Map<PDataType<?>, Format> formatters = new HashMap<>();
     private final int mutateBatchSize;
+    private final long mutateBatchSizeBytes;
     private final Long scn;
     private MutationState mutationState;
     private List<SQLCloseable> statements = new ArrayList<SQLCloseable>();
@@ -255,6 +256,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
                 this.services.getProps().get(QueryServices.SCHEMA_ATTRIB, QueryServicesOptions.DEFAULT_SCHEMA));
         this.tenantId = tenantId;
         this.mutateBatchSize = JDBCUtil.getMutateBatchSize(url, this.info, this.services.getProps());
+        this.mutateBatchSizeBytes = JDBCUtil.getMutateBatchSizeBytes(url, this.info, this.services.getProps());
         datePattern = this.services.getProps().get(QueryServices.DATE_FORMAT_ATTRIB, DateUtil.DEFAULT_DATE_FORMAT);
         timePattern = this.services.getProps().get(QueryServices.TIME_FORMAT_ATTRIB, DateUtil.DEFAULT_TIME_FORMAT);
         timestampPattern = this.services.getProps().get(QueryServices.TIMESTAMP_FORMAT_ATTRIB, DateUtil.DEFAULT_TIMESTAMP_FORMAT);
@@ -443,7 +445,11 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
     public int getMutateBatchSize() {
         return mutateBatchSize;
     }
-    
+
+    public long getMutateBatchSizeBytes(){
+        return mutateBatchSizeBytes;
+    }
+
     public PMetaData getMetaDataCache() {
         return metaData;
     }
@@ -457,7 +463,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
     }
 
     protected MutationState newMutationState(int maxSize) {
-        return new MutationState(maxSize, this); 
+        return new MutationState(maxSize, this);
     }
     
     public MutationState getMutationState() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a44d317e/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
index 15e55dd..c1db27c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
@@ -65,6 +65,7 @@ public class PhoenixIndexImportDirectMapper extends
     private DirectHTableWriter writer;
 
     private int batchSize;
+    private long batchSizeBytes;
 
     private MutationState mutationState;
 
@@ -87,12 +88,16 @@ public class PhoenixIndexImportDirectMapper extends
             }
             connection = ConnectionUtil.getOutputConnection(configuration, overrideProps);
             connection.setAutoCommit(false);
-            // Get BatchSize
+            // Get BatchSize, which is in terms of rows
             ConnectionQueryServices services = ((PhoenixConnection) connection).getQueryServices();
             int maxSize =
                     services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,
                         QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
             batchSize = Math.min(((PhoenixConnection) connection).getMutateBatchSize(), maxSize);
+
+            //Get batch size in terms of bytes
+            batchSizeBytes = ((PhoenixConnection) connection).getMutateBatchSizeBytes();
+
             LOG.info("Mutation Batch Size = " + batchSize);
 
             final String upsertQuery = PhoenixConfigurationUtil.getUpsertStatement(configuration);
@@ -107,8 +112,6 @@ public class PhoenixIndexImportDirectMapper extends
     protected void map(NullWritable key, PhoenixIndexDBWritable record, Context context)
             throws IOException, InterruptedException {
 
-        context.getCounter(PhoenixJobCounters.INPUT_RECORDS).increment(1);
-
         try {
             final List<Object> values = record.getValues();
             indxWritable.setValues(values);
@@ -119,7 +122,6 @@ public class PhoenixIndexImportDirectMapper extends
             MutationState currentMutationState = pconn.getMutationState();
             if (mutationState == null) {
                 mutationState = currentMutationState;
-                return;
             }
             // Keep accumulating Mutations till batch size
             mutationState.join(currentMutationState);
@@ -137,6 +139,7 @@ public class PhoenixIndexImportDirectMapper extends
             context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(1);
             throw new RuntimeException(e);
         }
+        context.getCounter(PhoenixJobCounters.INPUT_RECORDS).increment(1);
     }
 
     private void writeBatch(MutationState mutationState, Context context) throws IOException,
@@ -144,8 +147,12 @@ public class PhoenixIndexImportDirectMapper extends
         final Iterator<Pair<byte[], List<Mutation>>> iterator = mutationState.toMutations(true, null);
         while (iterator.hasNext()) {
             Pair<byte[], List<Mutation>> mutationPair = iterator.next();
-
-            writer.write(mutationPair.getSecond());
+            List<Mutation> batchMutations = mutationPair.getSecond();
+            List<List<Mutation>> batchOfBatchMutations =
+                MutationState.getMutationBatchList(batchSize, batchSizeBytes, batchMutations);
+            for (List<Mutation> mutationList : batchOfBatchMutations) {
+                writer.write(mutationList);
+            }
             context.getCounter(PhoenixJobCounters.OUTPUT_RECORDS).increment(
                 mutationPair.getSecond().size());
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a44d317e/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 233007f..2035de8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -86,7 +86,10 @@ public interface QueryServices extends SQLCloseable {
     public static final String CALL_QUEUE_ROUND_ROBIN_ATTRIB = "ipc.server.callqueue.roundrobin";
     public static final String SCAN_CACHE_SIZE_ATTRIB = "hbase.client.scanner.caching";
     public static final String MAX_MUTATION_SIZE_ATTRIB = "phoenix.mutate.maxSize";
+
+    @Deprecated //USE MUTATE_BATCH_SIZE_BYTES_ATTRIB instead
     public static final String MUTATE_BATCH_SIZE_ATTRIB = "phoenix.mutate.batchSize";
+    public static final String MUTATE_BATCH_SIZE_BYTES_ATTRIB = "phoenix.mutate.batchSizeBytes";
     public static final String MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB = "phoenix.coprocessor.maxServerCacheTimeToLiveMs";
     
     @Deprecated // Use FORCE_ROW_KEY_ORDER instead.

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a44d317e/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index df203b5..de0796f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -128,7 +128,10 @@ public class QueryServicesOptions {
     public static final boolean DEFAULT_DROP_METADATA = true; // Drop meta data also.
     public static final long DEFAULT_DRIVER_SHUTDOWN_TIMEOUT_MS = 5  * 1000; // Time to wait in ShutdownHook to exit gracefully.
 
-    public final static int DEFAULT_MUTATE_BATCH_SIZE = 1000; // Batch size for UPSERT SELECT and DELETE
+    @Deprecated //use DEFAULT_MUTATE_BATCH_SIZE_BYTES
+    public final static int DEFAULT_MUTATE_BATCH_SIZE = 100; // Batch size for UPSERT SELECT and DELETE
+    //Batch size in bytes for UPSERT, SELECT and DELETE. By default, 10MB
+    public final static long DEFAULT_MUTATE_BATCH_SIZE_BYTES = 134217728;
 	// The only downside of it being out-of-sync is that the parallelization of the scan won't be as balanced as it could be.
     public static final int DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS = 30000; // 30 sec (with no activity)
     public static final int DEFAULT_SCAN_CACHE_SIZE = 1000;
@@ -449,6 +452,7 @@ public class QueryServicesOptions {
         return set(MAX_MUTATION_SIZE_ATTRIB, maxMutateSize);
     }
 
+    @Deprecated
     public QueryServicesOptions setMutateBatchSize(int mutateBatchSize) {
         return set(MUTATE_BATCH_SIZE_ATTRIB, mutateBatchSize);
     }
@@ -513,6 +517,7 @@ public class QueryServicesOptions {
         return config.getInt(MAX_MUTATION_SIZE_ATTRIB, DEFAULT_MAX_MUTATION_SIZE);
     }
 
+    @Deprecated
     public int getMutateBatchSize() {
         return config.getInt(MUTATE_BATCH_SIZE_ATTRIB, DEFAULT_MUTATE_BATCH_SIZE);
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a44d317e/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java
index f835a21..2cab6fb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java
@@ -133,11 +133,17 @@ public class JDBCUtil {
         return (scnStr == null ? null : Long.parseLong(scnStr));
     }
 
+    @Deprecated // use getMutateBatchSizeBytes
     public static int getMutateBatchSize(String url, Properties info, ReadOnlyProps props) throws SQLException {
         String batchSizeStr = findProperty(url, info, PhoenixRuntime.UPSERT_BATCH_SIZE_ATTRIB);
         return (batchSizeStr == null ? props.getInt(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE) : Integer.parseInt(batchSizeStr));
     }
 
+    public static long getMutateBatchSizeBytes(String url, Properties info, ReadOnlyProps props) throws SQLException {
+        String batchSizeStr = findProperty(url, info, PhoenixRuntime.UPSERT_BATCH_SIZE_BYTES_ATTRIB);
+        return (batchSizeStr == null ? props.getLong(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES) : Long.parseLong(batchSizeStr));
+    }
+
     public static @Nullable PName getTenantId(String url, Properties info) throws SQLException {
         String tenantId = findProperty(url, info, PhoenixRuntime.TENANT_ID_ATTRIB);
         return (tenantId == null ? null : PNameFactory.newName(tenantId));

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a44d317e/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
index 3c16d00..5bfb55d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
@@ -147,6 +147,16 @@ public class PhoenixRuntime {
     public final static String UPSERT_BATCH_SIZE_ATTRIB = "UpsertBatchSize";
 
     /**
+     * Use this connection property to control the number of bytes that are
+     * batched together on an UPSERT INTO table1... SELECT ... FROM table2.
+     * It's only used when autoCommit is true and your source table is
+     * different than your target table or your SELECT statement has a
+     * GROUP BY clause. Overrides the value of UpsertBatchSize.
+     */
+    public final static String UPSERT_BATCH_SIZE_BYTES_ATTRIB = "UpsertBatchSizeBytes";
+
+
+    /**
      * Use this connection property to explicitly enable or disable auto-commit on a new connection.
      */
     public static final String AUTO_COMMIT_ATTRIB = "AutoCommit";
@@ -874,7 +884,7 @@ public class PhoenixRuntime {
      * Column names and family names are enclosed in double quotes to allow for case sensitivity and for presence of 
      * special characters. Salting column and view index id column are not included. If the connection is tenant specific 
      * and the table used by the query plan is multi-tenant, then the tenant id column is not included as well.
-     * @param datatypes - Initialized empty list to be filled with the corresponding data type for the columns in @param columns. 
+     * @param dataTypes - Initialized empty list to be filled with the corresponding data type for the columns in @param columns.
      * @param plan - query plan to get info for
      * @param conn - phoenix connection used to generate the query plan. Caller should take care of closing the connection appropriately.
      * @param forDataTable - if true, then column names and data types correspond to the data table even if the query plan uses

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a44d317e/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixDriverTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixDriverTest.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixDriverTest.java
index 4ab75a9..c87c2db 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixDriverTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixDriverTest.java
@@ -77,6 +77,15 @@ public class PhoenixDriverTest extends BaseConnectionlessQueryTest {
             fail("Upsert should have failed since the number of upserts (200) is greater than the MAX_MUTATION_SIZE_ATTRIB (100)");
         } catch (IllegalArgumentException expected) {}
     }
+
+    @Test
+    public void testMaxMutationSizeInBytesSetCorrectly() throws Exception {
+        Properties connectionProperties = new Properties();
+        connectionProperties.setProperty(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB,"100");
+        PhoenixConnection connection = (PhoenixConnection) DriverManager.getConnection(getUrl(), connectionProperties);
+        assertEquals(100L, connection.getMutateBatchSizeBytes());
+        assertEquals(100L, connection.getMutationState().getMaxSizeBytes());
+    }
     
     @Test
     public void testDisallowNegativeScn() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a44d317e/phoenix-core/src/test/java/org/apache/phoenix/util/JDBCUtilTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/JDBCUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/JDBCUtilTest.java
index 8c9a8a0..fae7633 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/JDBCUtilTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/JDBCUtilTest.java
@@ -27,7 +27,9 @@ import static org.junit.Assert.assertTrue;
 import java.util.Map;
 import java.util.Properties;
 
+import com.google.common.collect.Maps;
 import org.apache.hadoop.hbase.client.Consistency;
+import org.apache.phoenix.query.QueryServices;
 import org.junit.Test;
 
 public class JDBCUtilTest {
@@ -107,7 +109,7 @@ public class JDBCUtilTest {
     @Test
     public void testGetConsistency_TIMELINE_InUrl() {
         assertTrue(JDBCUtil.getConsistencyLevel("localhost;Consistency=TIMELINE", new Properties(),
-                Consistency.STRONG.toString()) == Consistency.TIMELINE);
+            Consistency.STRONG.toString()) == Consistency.TIMELINE);
     }
 
     @Test
@@ -122,6 +124,21 @@ public class JDBCUtilTest {
         Properties props = new Properties();
         props.setProperty(PhoenixRuntime.CONSISTENCY_ATTRIB, "TIMELINE");
         assertTrue(JDBCUtil.getConsistencyLevel("localhost", props, Consistency.STRONG.toString())
-                == Consistency.TIMELINE);
+            == Consistency.TIMELINE);
+    }
+
+    @Test
+    public void testGetMaxMutateBytes() throws Exception {
+        assertEquals(1000L, JDBCUtil.getMutateBatchSizeBytes("localhost;" + PhoenixRuntime.UPSERT_BATCH_SIZE_BYTES_ATTRIB +
+            "=1000", new Properties(), ReadOnlyProps.EMPTY_PROPS));
+
+        Properties props = new Properties();
+        props.setProperty(PhoenixRuntime.UPSERT_BATCH_SIZE_BYTES_ATTRIB, "2000");
+        assertEquals(2000L, JDBCUtil.getMutateBatchSizeBytes("localhost", props, ReadOnlyProps.EMPTY_PROPS));
+
+        Map<String, String> propMap = Maps.newHashMap();
+        propMap.put(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB, "3000");
+        ReadOnlyProps readOnlyProps = new ReadOnlyProps(propMap);
+        assertEquals(3000L, JDBCUtil.getMutateBatchSizeBytes("localhost", new Properties(), readOnlyProps));
     }
 }