You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2017/01/27 01:19:08 UTC
[24/26] phoenix git commit: PHOENIX-541 Make mutable batch size
bytes-based instead of row-based (Geoffrey Jacoby)
PHOENIX-541 Make mutable batch size bytes-based instead of row-based (Geoffrey Jacoby)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/a44d317e
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/a44d317e
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/a44d317e
Branch: refs/heads/calcite
Commit: a44d317e32d383c4964824448bf12dfb8ed8bfec
Parents: b9323e1
Author: Samarth <sa...@salesforce.com>
Authored: Thu Jan 26 15:31:43 2017 -0800
Committer: Samarth <sa...@salesforce.com>
Committed: Thu Jan 26 15:31:43 2017 -0800
----------------------------------------------------------------------
.../org/apache/phoenix/end2end/QueryMoreIT.java | 35 ++++++++++++++
.../end2end/UpsertSelectAutoCommitIT.java | 2 +-
.../apache/phoenix/end2end/UpsertSelectIT.java | 2 +-
.../phoenix/end2end/index/ImmutableIndexIT.java | 1 -
.../org/apache/phoenix/tx/TransactionIT.java | 2 +-
.../org/apache/phoenix/tx/TxCheckpointIT.java | 2 +-
.../UngroupedAggregateRegionObserver.java | 31 +++++++-----
.../apache/phoenix/execute/MutationState.java | 51 ++++++++++++++++++--
.../apache/phoenix/jdbc/PhoenixConnection.java | 10 +++-
.../index/PhoenixIndexImportDirectMapper.java | 19 +++++---
.../org/apache/phoenix/query/QueryServices.java | 3 ++
.../phoenix/query/QueryServicesOptions.java | 7 ++-
.../java/org/apache/phoenix/util/JDBCUtil.java | 6 +++
.../org/apache/phoenix/util/PhoenixRuntime.java | 12 ++++-
.../apache/phoenix/jdbc/PhoenixDriverTest.java | 9 ++++
.../org/apache/phoenix/util/JDBCUtilTest.java | 21 +++++++-
16 files changed, 181 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a44d317e/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java
index 2b27f00..a2dab16 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java
@@ -37,6 +37,8 @@ import java.util.Properties;
import org.apache.hadoop.hbase.util.Base64;
import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.TestUtil;
import org.junit.Test;
@@ -471,4 +473,37 @@ public class QueryMoreIT extends ParallelStatsDisabledIT {
assertFalse(rs.next());
}
}
+
+ @Test
+ public void testMutationBatch() throws Exception {
+ Properties connectionProperties = new Properties();
+ connectionProperties.setProperty(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB, "1024");
+ PhoenixConnection connection = (PhoenixConnection) DriverManager.getConnection(getUrl(), connectionProperties);
+ String fullTableName = generateUniqueName();
+ try (Statement stmt = connection.createStatement()) {
+ stmt.execute("CREATE TABLE " + fullTableName + "(\n" +
+ " ORGANIZATION_ID CHAR(15) NOT NULL,\n" +
+ " SCORE DOUBLE NOT NULL,\n" +
+ " ENTITY_ID CHAR(15) NOT NULL\n" +
+ " CONSTRAINT PAGE_SNAPSHOT_PK PRIMARY KEY (\n" +
+ " ORGANIZATION_ID,\n" +
+ " SCORE DESC,\n" +
+ " ENTITY_ID DESC\n" +
+ " )\n" +
+ ") MULTI_TENANT=TRUE");
+ }
+ PreparedStatement stmt = connection.prepareStatement("upsert into " + fullTableName +
+ " (organization_id, entity_id, score) values (?,?,?)");
+ try {
+ for (int i = 0; i < 4; i++) {
+ stmt.setString(1, "AAAA" + i);
+ stmt.setString(2, "BBBB" + i);
+ stmt.setInt(3, 1);
+ stmt.execute();
+ }
+ connection.commit();
+ } catch (IllegalArgumentException expected) {}
+
+ assertEquals(2L, connection.getMutationState().getBatchCount());
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a44d317e/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectAutoCommitIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectAutoCommitIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectAutoCommitIT.java
index 37482de..6b781a0 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectAutoCommitIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectAutoCommitIT.java
@@ -152,7 +152,7 @@ public class UpsertSelectAutoCommitIT extends ParallelStatsDisabledIT {
@Test
public void testUpsertSelectDoesntSeeUpsertedData() throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- props.setProperty(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, Integer.toString(3));
+ props.setProperty(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB, Integer.toString(512));
props.setProperty(QueryServices.SCAN_CACHE_SIZE_ATTRIB, Integer.toString(3));
props.setProperty(QueryServices.SCAN_RESULT_CHUNK_SIZE, Integer.toString(3));
Connection conn = DriverManager.getConnection(getUrl(), props);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a44d317e/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
index 763f11b..f5905ee 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
@@ -1435,7 +1435,7 @@ public class UpsertSelectIT extends BaseClientManagedTimeIT {
long ts = nextTimestamp();
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts));
- props.setProperty(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, Integer.toString(3));
+ props.setProperty(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB, Integer.toString(512));
props.setProperty(QueryServices.SCAN_CACHE_SIZE_ATTRIB, Integer.toString(3));
props.setProperty(QueryServices.SCAN_RESULT_CHUNK_SIZE, Integer.toString(3));
Connection conn = DriverManager.getConnection(getUrl(), props);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a44d317e/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
index 3ee9721..bc301fa 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
@@ -154,7 +154,6 @@ public class ImmutableIndexIT extends BaseUniqueNamesOwnClusterIT {
return;
}
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- props.setProperty(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, Integer.toString(100));
String tableName = "TBL_" + generateUniqueName();
String indexName = "IND_" + generateUniqueName();
String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a44d317e/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
index 83128f1..bde5cc8 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
@@ -603,7 +603,7 @@ public class TransactionIT extends ParallelStatsDisabledIT {
@Test
public void testParallelUpsertSelect() throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- props.setProperty(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, Integer.toString(3));
+ props.setProperty(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB, Integer.toString(512));
props.setProperty(QueryServices.SCAN_CACHE_SIZE_ATTRIB, Integer.toString(3));
props.setProperty(QueryServices.SCAN_RESULT_CHUNK_SIZE, Integer.toString(3));
Connection conn = DriverManager.getConnection(getUrl(), props);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a44d317e/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java
index 14bcd70..246ecd4 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java
@@ -80,7 +80,7 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT {
String seqName = "SEQ_" + generateUniqueName();
String fullTableName = SchemaUtil.getTableName(tableName, tableName);
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- props.setProperty(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, Integer.toString(3));
+ props.setProperty(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB, Integer.toString(512));
props.setProperty(QueryServices.SCAN_CACHE_SIZE_ATTRIB, Integer.toString(3));
props.setProperty(QueryServices.SCAN_RESULT_CHUNK_SIZE, Integer.toString(3));
Connection conn = getConnection(props);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a44d317e/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 9ee0054..a888bb2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -22,6 +22,7 @@ import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY;
import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB;
import static org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker.COMPACTION_UPDATE_STATS_ROW_COUNT;
import static org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker.CONCURRENT_UPDATE_STATS_ROW_COUNT;
@@ -70,6 +71,7 @@ import org.apache.hadoop.io.WritableUtils;
import org.apache.phoenix.cache.ServerCacheClient;
import org.apache.phoenix.coprocessor.generated.PTableProtos;
import org.apache.phoenix.exception.DataExceedsCapacityException;
+import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.execute.TupleProjector;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.ExpressionType;
@@ -343,6 +345,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
}
int batchSize = 0;
+ long batchSizeBytes = 0L;
List<Mutation> mutations = Collections.emptyList();
boolean needToWrite = false;
Configuration conf = c.getEnvironment().getConfiguration();
@@ -369,6 +372,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
// TODO: size better
mutations = Lists.newArrayListWithExpectedSize(1024);
batchSize = env.getConfiguration().getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
+ batchSizeBytes = env.getConfiguration().getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB,
+ QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES);
}
Aggregators aggregators = ServerAggregators.deserialize(
scan.getAttribute(BaseScannerRegionObserver.AGGREGATORS), env.getConfiguration());
@@ -596,19 +601,23 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
mutations.add(put);
}
}
- // Commit in batches based on UPSERT_BATCH_SIZE_ATTRIB in config
- if (!mutations.isEmpty() && batchSize > 0 &&
- mutations.size() % batchSize == 0) {
- commitBatch(region, mutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr,
- txState);
- mutations.clear();
+ // Commit in batches based on UPSERT_BATCH_SIZE_BYTES_ATTRIB in config
+ List<List<Mutation>> batchMutationList =
+ MutationState.getMutationBatchList(batchSize, batchSizeBytes, mutations);
+ for (List<Mutation> batchMutations : batchMutationList) {
+ commitBatch(region, batchMutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr,
+ txState);
+ batchMutations.clear();
}
- // Commit in batches based on UPSERT_BATCH_SIZE_ATTRIB in config
- if (!indexMutations.isEmpty() && batchSize > 0 &&
- indexMutations.size() % batchSize == 0) {
- commitBatch(region, indexMutations, null, blockingMemStoreSize, null, txState);
- indexMutations.clear();
+ mutations.clear();
+ // Commit in batches based on UPSERT_BATCH_SIZE_BYTES_ATTRIB in config
+ List<List<Mutation>> batchIndexMutationList =
+ MutationState.getMutationBatchList(batchSize, batchSizeBytes, indexMutations);
+ for (List<Mutation> batchIndexMutations : batchIndexMutationList) {
+ commitBatch(region, batchIndexMutations, null, blockingMemStoreSize, null, txState);
+ batchIndexMutations.clear();
}
+ indexMutations.clear();
}
aggregators.aggregate(rowAggregators, result);
hasAny = true;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a44d317e/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 31ab7c9..4775d59 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -126,6 +126,8 @@ public class MutationState implements SQLCloseable {
private final PhoenixConnection connection;
private final long maxSize;
+ private final long maxSizeBytes;
+ private long batchCount = 0L;
private final Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> mutations;
private final List<TransactionAware> txAwares;
private final TransactionContext txContext;
@@ -140,7 +142,7 @@ public class MutationState implements SQLCloseable {
private final MutationMetricQueue mutationMetricQueue;
private ReadMetricQueue readMetricQueue;
-
+
public MutationState(long maxSize, PhoenixConnection connection) {
this(maxSize,connection, null, null);
}
@@ -171,6 +173,7 @@ public class MutationState implements SQLCloseable {
Transaction tx, TransactionContext txContext) {
this.maxSize = maxSize;
this.connection = connection;
+ this.maxSizeBytes = connection.getMutateBatchSizeBytes();
this.mutations = mutations;
boolean isMetricsEnabled = connection.isRequestLevelMetricsEnabled();
this.mutationMetricQueue = isMetricsEnabled ? new MutationMetricQueue()
@@ -743,12 +746,11 @@ public class MutationState implements SQLCloseable {
public static long getMutationTimestamp(final Long tableTimestamp, Long scn) {
return (tableTimestamp!=null && tableTimestamp!=QueryConstants.UNSET_TIMESTAMP) ? tableTimestamp : (scn == null ? HConstants.LATEST_TIMESTAMP : scn);
}
-
+
/**
* Validates that the meta data is valid against the server meta data if we haven't yet done so.
* Otherwise, for every UPSERT VALUES call, we'd need to hit the server to see if the meta data
* has changed.
- * @param connection
* @return the server time to use for the upsert
* @throws SQLException if the table or any columns no longer exist
*/
@@ -842,6 +844,15 @@ public class MutationState implements SQLCloseable {
}
}
}
+
+ public long getMaxSizeBytes() {
+ return maxSizeBytes;
+ }
+
+ public long getBatchCount() {
+ return batchCount;
+ }
+
private class MetaDataAwareHTable extends DelegateHTable {
private final TableRef tableRef;
@@ -1068,7 +1079,11 @@ public class MutationState implements SQLCloseable {
long startTime = System.currentTimeMillis();
child.addTimelineAnnotation("Attempt " + retryCount);
- hTable.batch(mutationList);
+ List<List<Mutation>> mutationBatchList = getMutationBatchList(maxSize, maxSizeBytes, mutationList);
+ for (List<Mutation> mutationBatch : mutationBatchList) {
+ hTable.batch(mutationBatch);
+ batchCount++;
+ }
if (logger.isDebugEnabled()) logger.debug("Sent batch of " + numMutations + " for " + Bytes.toString(htableName));
child.stop();
child.stop();
@@ -1132,6 +1147,34 @@ public class MutationState implements SQLCloseable {
}
}
+ /**
+ * Split the list of mutations into multiple lists that don't exceed row and byte thresholds
+ * @param allMutationList List of HBase mutations
+ * @return List of lists of mutations
+ */
+ public static List<List<Mutation>> getMutationBatchList(long maxSize, long maxSizeBytes, List<Mutation> allMutationList) {
+ List<List<Mutation>> mutationBatchList = Lists.newArrayList();
+ List<Mutation> currentList = Lists.newArrayList();
+ long currentBatchSizeBytes = 0L;
+ for (Mutation mutation : allMutationList) {
+ long mutationSizeBytes = mutation.heapSize();
+ if (currentList.size() == maxSize || currentBatchSizeBytes + mutationSizeBytes > maxSizeBytes) {
+ if (currentList.size() > 0) {
+ mutationBatchList.add(currentList);
+ currentList = Lists.newArrayList();
+ currentBatchSizeBytes = 0L;
+ }
+ }
+ currentList.add(mutation);
+ currentBatchSizeBytes += mutationSizeBytes;
+ }
+ if (currentList.size() > 0) {
+ mutationBatchList.add(currentList);
+ }
+ return mutationBatchList;
+
+ }
+
public byte[] encodeTransaction() throws SQLException {
try {
return CODEC.encode(getTransaction());
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a44d317e/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index c060164..cb2390e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -142,6 +142,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
private final Properties info;
private final Map<PDataType<?>, Format> formatters = new HashMap<>();
private final int mutateBatchSize;
+ private final long mutateBatchSizeBytes;
private final Long scn;
private MutationState mutationState;
private List<SQLCloseable> statements = new ArrayList<SQLCloseable>();
@@ -255,6 +256,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
this.services.getProps().get(QueryServices.SCHEMA_ATTRIB, QueryServicesOptions.DEFAULT_SCHEMA));
this.tenantId = tenantId;
this.mutateBatchSize = JDBCUtil.getMutateBatchSize(url, this.info, this.services.getProps());
+ this.mutateBatchSizeBytes = JDBCUtil.getMutateBatchSizeBytes(url, this.info, this.services.getProps());
datePattern = this.services.getProps().get(QueryServices.DATE_FORMAT_ATTRIB, DateUtil.DEFAULT_DATE_FORMAT);
timePattern = this.services.getProps().get(QueryServices.TIME_FORMAT_ATTRIB, DateUtil.DEFAULT_TIME_FORMAT);
timestampPattern = this.services.getProps().get(QueryServices.TIMESTAMP_FORMAT_ATTRIB, DateUtil.DEFAULT_TIMESTAMP_FORMAT);
@@ -443,7 +445,11 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
public int getMutateBatchSize() {
return mutateBatchSize;
}
-
+
+ public long getMutateBatchSizeBytes(){
+ return mutateBatchSizeBytes;
+ }
+
public PMetaData getMetaDataCache() {
return metaData;
}
@@ -457,7 +463,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
}
protected MutationState newMutationState(int maxSize) {
- return new MutationState(maxSize, this);
+ return new MutationState(maxSize, this);
}
public MutationState getMutationState() {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a44d317e/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
index 15e55dd..c1db27c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
@@ -65,6 +65,7 @@ public class PhoenixIndexImportDirectMapper extends
private DirectHTableWriter writer;
private int batchSize;
+ private long batchSizeBytes;
private MutationState mutationState;
@@ -87,12 +88,16 @@ public class PhoenixIndexImportDirectMapper extends
}
connection = ConnectionUtil.getOutputConnection(configuration, overrideProps);
connection.setAutoCommit(false);
- // Get BatchSize
+ // Get BatchSize, which is in terms of rows
ConnectionQueryServices services = ((PhoenixConnection) connection).getQueryServices();
int maxSize =
services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,
QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
batchSize = Math.min(((PhoenixConnection) connection).getMutateBatchSize(), maxSize);
+
+ //Get batch size in terms of bytes
+ batchSizeBytes = ((PhoenixConnection) connection).getMutateBatchSizeBytes();
+
LOG.info("Mutation Batch Size = " + batchSize);
final String upsertQuery = PhoenixConfigurationUtil.getUpsertStatement(configuration);
@@ -107,8 +112,6 @@ public class PhoenixIndexImportDirectMapper extends
protected void map(NullWritable key, PhoenixIndexDBWritable record, Context context)
throws IOException, InterruptedException {
- context.getCounter(PhoenixJobCounters.INPUT_RECORDS).increment(1);
-
try {
final List<Object> values = record.getValues();
indxWritable.setValues(values);
@@ -119,7 +122,6 @@ public class PhoenixIndexImportDirectMapper extends
MutationState currentMutationState = pconn.getMutationState();
if (mutationState == null) {
mutationState = currentMutationState;
- return;
}
// Keep accumulating Mutations till batch size
mutationState.join(currentMutationState);
@@ -137,6 +139,7 @@ public class PhoenixIndexImportDirectMapper extends
context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(1);
throw new RuntimeException(e);
}
+ context.getCounter(PhoenixJobCounters.INPUT_RECORDS).increment(1);
}
private void writeBatch(MutationState mutationState, Context context) throws IOException,
@@ -144,8 +147,12 @@ public class PhoenixIndexImportDirectMapper extends
final Iterator<Pair<byte[], List<Mutation>>> iterator = mutationState.toMutations(true, null);
while (iterator.hasNext()) {
Pair<byte[], List<Mutation>> mutationPair = iterator.next();
-
- writer.write(mutationPair.getSecond());
+ List<Mutation> batchMutations = mutationPair.getSecond();
+ List<List<Mutation>> batchOfBatchMutations =
+ MutationState.getMutationBatchList(batchSize, batchSizeBytes, batchMutations);
+ for (List<Mutation> mutationList : batchOfBatchMutations) {
+ writer.write(mutationList);
+ }
context.getCounter(PhoenixJobCounters.OUTPUT_RECORDS).increment(
mutationPair.getSecond().size());
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a44d317e/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 233007f..2035de8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -86,7 +86,10 @@ public interface QueryServices extends SQLCloseable {
public static final String CALL_QUEUE_ROUND_ROBIN_ATTRIB = "ipc.server.callqueue.roundrobin";
public static final String SCAN_CACHE_SIZE_ATTRIB = "hbase.client.scanner.caching";
public static final String MAX_MUTATION_SIZE_ATTRIB = "phoenix.mutate.maxSize";
+
+ @Deprecated //USE MUTATE_BATCH_SIZE_BYTES_ATTRIB instead
public static final String MUTATE_BATCH_SIZE_ATTRIB = "phoenix.mutate.batchSize";
+ public static final String MUTATE_BATCH_SIZE_BYTES_ATTRIB = "phoenix.mutate.batchSizeBytes";
public static final String MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB = "phoenix.coprocessor.maxServerCacheTimeToLiveMs";
@Deprecated // Use FORCE_ROW_KEY_ORDER instead.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a44d317e/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index df203b5..de0796f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -128,7 +128,10 @@ public class QueryServicesOptions {
public static final boolean DEFAULT_DROP_METADATA = true; // Drop meta data also.
public static final long DEFAULT_DRIVER_SHUTDOWN_TIMEOUT_MS = 5 * 1000; // Time to wait in ShutdownHook to exit gracefully.
- public final static int DEFAULT_MUTATE_BATCH_SIZE = 1000; // Batch size for UPSERT SELECT and DELETE
+ @Deprecated //use DEFAULT_MUTATE_BATCH_SIZE_BYTES
+ public final static int DEFAULT_MUTATE_BATCH_SIZE = 100; // Batch size for UPSERT SELECT and DELETE
+ //Batch size in bytes for UPSERT, SELECT and DELETE. By default, 10MB
+ public final static long DEFAULT_MUTATE_BATCH_SIZE_BYTES = 134217728;
// The only downside of it being out-of-sync is that the parallelization of the scan won't be as balanced as it could be.
public static final int DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS = 30000; // 30 sec (with no activity)
public static final int DEFAULT_SCAN_CACHE_SIZE = 1000;
@@ -449,6 +452,7 @@ public class QueryServicesOptions {
return set(MAX_MUTATION_SIZE_ATTRIB, maxMutateSize);
}
+ @Deprecated
public QueryServicesOptions setMutateBatchSize(int mutateBatchSize) {
return set(MUTATE_BATCH_SIZE_ATTRIB, mutateBatchSize);
}
@@ -513,6 +517,7 @@ public class QueryServicesOptions {
return config.getInt(MAX_MUTATION_SIZE_ATTRIB, DEFAULT_MAX_MUTATION_SIZE);
}
+ @Deprecated
public int getMutateBatchSize() {
return config.getInt(MUTATE_BATCH_SIZE_ATTRIB, DEFAULT_MUTATE_BATCH_SIZE);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a44d317e/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java
index f835a21..2cab6fb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java
@@ -133,11 +133,17 @@ public class JDBCUtil {
return (scnStr == null ? null : Long.parseLong(scnStr));
}
+ @Deprecated // use getMutateBatchSizeBytes
public static int getMutateBatchSize(String url, Properties info, ReadOnlyProps props) throws SQLException {
String batchSizeStr = findProperty(url, info, PhoenixRuntime.UPSERT_BATCH_SIZE_ATTRIB);
return (batchSizeStr == null ? props.getInt(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE) : Integer.parseInt(batchSizeStr));
}
+ public static long getMutateBatchSizeBytes(String url, Properties info, ReadOnlyProps props) throws SQLException {
+ String batchSizeStr = findProperty(url, info, PhoenixRuntime.UPSERT_BATCH_SIZE_BYTES_ATTRIB);
+ return (batchSizeStr == null ? props.getLong(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES) : Long.parseLong(batchSizeStr));
+ }
+
public static @Nullable PName getTenantId(String url, Properties info) throws SQLException {
String tenantId = findProperty(url, info, PhoenixRuntime.TENANT_ID_ATTRIB);
return (tenantId == null ? null : PNameFactory.newName(tenantId));
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a44d317e/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
index 3c16d00..5bfb55d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
@@ -147,6 +147,16 @@ public class PhoenixRuntime {
public final static String UPSERT_BATCH_SIZE_ATTRIB = "UpsertBatchSize";
/**
+ * Use this connection property to control the number of bytes that are
+ * batched together on an UPSERT INTO table1... SELECT ... FROM table2.
+ * It's only used when autoCommit is true and your source table is
+ * different than your target table or your SELECT statement has a
+ * GROUP BY clause. Overrides the value of UpsertBatchSize.
+ */
+ public final static String UPSERT_BATCH_SIZE_BYTES_ATTRIB = "UpsertBatchSizeBytes";
+
+
+ /**
* Use this connection property to explicitly enable or disable auto-commit on a new connection.
*/
public static final String AUTO_COMMIT_ATTRIB = "AutoCommit";
@@ -874,7 +884,7 @@ public class PhoenixRuntime {
* Column names and family names are enclosed in double quotes to allow for case sensitivity and for presence of
* special characters. Salting column and view index id column are not included. If the connection is tenant specific
* and the table used by the query plan is multi-tenant, then the tenant id column is not included as well.
- * @param datatypes - Initialized empty list to be filled with the corresponding data type for the columns in @param columns.
+ * @param dataTypes - Initialized empty list to be filled with the corresponding data type for the columns in @param columns.
* @param plan - query plan to get info for
* @param conn - phoenix connection used to generate the query plan. Caller should take care of closing the connection appropriately.
* @param forDataTable - if true, then column names and data types correspond to the data table even if the query plan uses
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a44d317e/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixDriverTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixDriverTest.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixDriverTest.java
index 4ab75a9..c87c2db 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixDriverTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixDriverTest.java
@@ -77,6 +77,15 @@ public class PhoenixDriverTest extends BaseConnectionlessQueryTest {
fail("Upsert should have failed since the number of upserts (200) is greater than the MAX_MUTATION_SIZE_ATTRIB (100)");
} catch (IllegalArgumentException expected) {}
}
+
+ @Test
+ public void testMaxMutationSizeInBytesSetCorrectly() throws Exception {
+ Properties connectionProperties = new Properties();
+ connectionProperties.setProperty(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB,"100");
+ PhoenixConnection connection = (PhoenixConnection) DriverManager.getConnection(getUrl(), connectionProperties);
+ assertEquals(100L, connection.getMutateBatchSizeBytes());
+ assertEquals(100L, connection.getMutationState().getMaxSizeBytes());
+ }
@Test
public void testDisallowNegativeScn() {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a44d317e/phoenix-core/src/test/java/org/apache/phoenix/util/JDBCUtilTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/JDBCUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/JDBCUtilTest.java
index 8c9a8a0..fae7633 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/JDBCUtilTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/JDBCUtilTest.java
@@ -27,7 +27,9 @@ import static org.junit.Assert.assertTrue;
import java.util.Map;
import java.util.Properties;
+import com.google.common.collect.Maps;
import org.apache.hadoop.hbase.client.Consistency;
+import org.apache.phoenix.query.QueryServices;
import org.junit.Test;
public class JDBCUtilTest {
@@ -107,7 +109,7 @@ public class JDBCUtilTest {
@Test
public void testGetConsistency_TIMELINE_InUrl() {
assertTrue(JDBCUtil.getConsistencyLevel("localhost;Consistency=TIMELINE", new Properties(),
- Consistency.STRONG.toString()) == Consistency.TIMELINE);
+ Consistency.STRONG.toString()) == Consistency.TIMELINE);
}
@Test
@@ -122,6 +124,21 @@ public class JDBCUtilTest {
Properties props = new Properties();
props.setProperty(PhoenixRuntime.CONSISTENCY_ATTRIB, "TIMELINE");
assertTrue(JDBCUtil.getConsistencyLevel("localhost", props, Consistency.STRONG.toString())
- == Consistency.TIMELINE);
+ == Consistency.TIMELINE);
+ }
+
+ @Test
+ public void testGetMaxMutateBytes() throws Exception {
+ assertEquals(1000L, JDBCUtil.getMutateBatchSizeBytes("localhost;" + PhoenixRuntime.UPSERT_BATCH_SIZE_BYTES_ATTRIB +
+ "=1000", new Properties(), ReadOnlyProps.EMPTY_PROPS));
+
+ Properties props = new Properties();
+ props.setProperty(PhoenixRuntime.UPSERT_BATCH_SIZE_BYTES_ATTRIB, "2000");
+ assertEquals(2000L, JDBCUtil.getMutateBatchSizeBytes("localhost", props, ReadOnlyProps.EMPTY_PROPS));
+
+ Map<String, String> propMap = Maps.newHashMap();
+ propMap.put(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB, "3000");
+ ReadOnlyProps readOnlyProps = new ReadOnlyProps(propMap);
+ assertEquals(3000L, JDBCUtil.getMutateBatchSizeBytes("localhost", new Properties(), readOnlyProps));
}
}