You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by an...@apache.org on 2017/01/31 06:18:31 UTC
phoenix git commit: PHOENIX-3271 Distribute UPSERT SELECT across
cluster
Repository: phoenix
Updated Branches:
refs/heads/4.x-HBase-1.1 29bed889d -> 496f376d2
PHOENIX-3271 Distribute UPSERT SELECT across cluster
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/496f376d
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/496f376d
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/496f376d
Branch: refs/heads/4.x-HBase-1.1
Commit: 496f376d23e91c01c1d98286b451a4acf765173c
Parents: 29bed88
Author: Ankit Singhal <an...@gmail.com>
Authored: Tue Jan 31 11:48:24 2017 +0530
Committer: Ankit Singhal <an...@gmail.com>
Committed: Tue Jan 31 11:48:24 2017 +0530
----------------------------------------------------------------------
.../phoenix/monitoring/PhoenixMetricsIT.java | 56 ++++-------
.../apache/phoenix/rpc/PhoenixServerRpcIT.java | 6 ++
.../apache/phoenix/compile/UpsertCompiler.java | 38 ++++----
.../UngroupedAggregateRegionObserver.java | 97 ++++++++++++++++++--
.../org/apache/phoenix/schema/PTableImpl.java | 10 ++
5 files changed, 138 insertions(+), 69 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/496f376d/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
index 16a66df..4d075ab 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
@@ -439,18 +439,31 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
public void testMetricsForUpsertSelectWithAutoCommit() throws Exception {
String tableName1 = generateUniqueName();
long table1SaltBuckets = 6;
- String ddl = "CREATE TABLE " + tableName1 + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = "
- + table1SaltBuckets;
+ String ddl = "CREATE TABLE " + tableName1 + " (K BIGINT NOT NULL PRIMARY KEY ROW_TIMESTAMP, V VARCHAR)"
+ + " SALT_BUCKETS = " + table1SaltBuckets + ", IMMUTABLE_ROWS = true";
Connection ddlConn = DriverManager.getConnection(getUrl());
ddlConn.createStatement().execute(ddl);
ddlConn.close();
int numRows = 10;
- insertRowsInTable(tableName1, numRows);
+ String dml = "UPSERT INTO " + tableName1 + " VALUES (?, ?)";
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ PreparedStatement stmt = conn.prepareStatement(dml);
+ for (int i = 1; i <= numRows; i++) {
+ stmt.setLong(1, i);
+ stmt.setString(2, "value" + i);
+ stmt.executeUpdate();
+ }
+ conn.commit();
+ }
String tableName2 = generateUniqueName();
- ddl = "CREATE TABLE " + tableName2 + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = 10";
+ ddl = "CREATE TABLE " + tableName2 + " (K BIGINT NOT NULL PRIMARY KEY ROW_TIMESTAMP, V VARCHAR)"
+ + " SALT_BUCKETS = 10" + ", IMMUTABLE_ROWS = true";
ddlConn = DriverManager.getConnection(getUrl());
ddlConn.createStatement().execute(ddl);
+ String indexName = generateUniqueName();
+ ddl = "CREATE INDEX " + indexName + " ON " + tableName2 + " (V)";
+ ddlConn.createStatement().execute(ddl);
ddlConn.close();
String upsertSelect = "UPSERT INTO " + tableName2 + " SELECT * FROM " + tableName1;
@@ -602,41 +615,6 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
}
@Test
- public void testMetricsForUpsertSelectSameTable() throws Exception {
- String tableName = generateUniqueName();
- long table1SaltBuckets = 6;
- String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = "
- + table1SaltBuckets;
- Connection ddlConn = DriverManager.getConnection(getUrl());
- ddlConn.createStatement().execute(ddl);
- ddlConn.close();
- int numRows = 10;
- insertRowsInTable(tableName, numRows);
-
- Connection conn = DriverManager.getConnection(getUrl());
- conn.setAutoCommit(false);
- String upsertSelect = "UPSERT INTO " + tableName + " SELECT * FROM " + tableName;
- conn.createStatement().executeUpdate(upsertSelect);
- conn.commit();
- PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
-
- Map<String, Map<String, Long>> mutationMetrics = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn);
- // Because auto-commit is off, upsert select into the same table will run on the client.
- // So we should have client side read and write metrics available.
- assertMutationMetrics(tableName, numRows, mutationMetrics);
- Map<String, Map<String, Long>> readMetrics = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn);
- assertReadMetricsForMutatingSql(tableName, table1SaltBuckets, readMetrics);
- PhoenixRuntime.resetMetrics(pConn);
- // With autocommit on, still, this upsert select runs on the client side.
- conn.setAutoCommit(true);
- conn.createStatement().executeUpdate(upsertSelect);
- Map<String, Map<String, Long>> autoCommitMutationMetrics = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn);
- Map<String, Map<String, Long>> autoCommitReadMetrics = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn);
- assertMetricsAreSame(mutationMetrics, autoCommitMutationMetrics, mutationMetricsToSkip);
- assertMetricsAreSame(readMetrics, autoCommitReadMetrics, readMetricsToSkip);
- }
-
- @Test
public void testOpenConnectionsCounter() throws Exception {
long numOpenConnections = GLOBAL_OPEN_PHOENIX_CONNECTIONS.getMetric().getValue();
try (Connection conn = DriverManager.getConnection(getUrl())) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/496f376d/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java b/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java
index 92f7294..410f02c 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java
@@ -153,6 +153,12 @@ public class PhoenixServerRpcIT extends BaseUniqueNamesOwnClusterIT {
// verify that that index queue is used only once (for the first upsert)
Mockito.verify(TestPhoenixIndexRpcSchedulerFactory.getIndexRpcExecutor()).dispatch(Mockito.any(CallRunner.class));
+
+ TestPhoenixIndexRpcSchedulerFactory.reset();
+ conn.createStatement().execute(
+ "CREATE INDEX " + indexName + "_1 ON " + dataTableFullName + " (v1) INCLUDE (v2)");
+ // verify that that index queue is used and only once (during Upsert Select on server to build the index)
+ Mockito.verify(TestPhoenixIndexRpcSchedulerFactory.getIndexRpcExecutor()).dispatch(Mockito.any(CallRunner.class));
}
finally {
conn.close();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/496f376d/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 32ce6ad..18070d4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -85,6 +85,7 @@ import org.apache.phoenix.schema.MetaDataEntityNotFoundException;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PColumnImpl;
import org.apache.phoenix.schema.PName;
+import org.apache.phoenix.schema.PNameFactory;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTable.IndexType;
import org.apache.phoenix.schema.PTable.ViewType;
@@ -506,7 +507,7 @@ public class UpsertCompiler {
&& tableRefToBe.equals(selectResolver.getTables().get(0));
tableRefToBe = adjustTimestampToMinOfSameTable(tableRefToBe, selectResolver.getTables());
/* We can run the upsert in a coprocessor if:
- * 1) from has only 1 table and the into table matches from table
+ * 1) from has only 1 table
* 2) the select query isn't doing aggregation (which requires a client-side final merge)
* 3) autoCommit is on
* 4) the table is not immutable with indexes, as the client is the one that figures out the additional
@@ -523,9 +524,10 @@ public class UpsertCompiler {
parallelIteratorFactoryToBe = new UpsertingParallelIteratorFactory(connection, tableRefToBe, useServerTimestampToBe);
// If we're in the else, then it's not an aggregate, distinct, limited, or sequence using query,
// so we might be able to run it entirely on the server side.
- // For a table with row timestamp column, we can't guarantee that the row key will reside in the
// region space managed by region servers. So we bail out on executing on server side.
- runOnServer = sameTable && isAutoCommit && !table.isTransactional() && !(table.isImmutableRows() && !table.getIndexes().isEmpty()) && table.getRowTimestampColPos() == -1;
+ runOnServer = isAutoCommit && !table.isTransactional()
+ && !(table.isImmutableRows() && !table.getIndexes().isEmpty())
+ && !select.isJoin() && table.getRowTimestampColPos() == -1;
}
// If we may be able to run on the server, add a hint that favors using the data table
// if all else is equal.
@@ -599,7 +601,6 @@ public class UpsertCompiler {
if (valueNodes == null) {
queryPlanToBe = new QueryOptimizer(services).optimize(queryPlanToBe, statement, targetColumns, parallelIteratorFactoryToBe);
projectorToBe = queryPlanToBe.getProjector();
- runOnServer &= queryPlanToBe.getTableRef().equals(tableRefToBe);
}
final List<PColumn> allColumns = allColumnsToBe;
final RowProjector projector = projectorToBe;
@@ -657,41 +658,34 @@ public class UpsertCompiler {
Expression literalNull = LiteralExpression.newConstant(null, column.getDataType(), Determinism.ALWAYS);
projectedExpressions.add(literalNull);
allColumnsIndexes[pos] = column.getPosition();
- }
+ }
// Swap select expression at pos with i
Collections.swap(projectedExpressions, i, pos);
// Swap column indexes and reverse column indexes too
int tempPos = allColumnsIndexes[i];
allColumnsIndexes[i] = allColumnsIndexes[pos];
allColumnsIndexes[pos] = tempPos;
- reverseColumnIndexes[tempPos] = reverseColumnIndexes[i];
+ reverseColumnIndexes[tempPos] = pos;
reverseColumnIndexes[i] = i;
}
- // If any pk slots are changing, be conservative and don't run this server side.
- // If the row ends up living in a different region, we'll get an error otherwise.
- for (int i = 0; i < table.getPKColumns().size(); i++) {
- PColumn column = table.getPKColumns().get(i);
- Expression source = projectedExpressions.get(i);
- if (source == null || !source.equals(new ColumnRef(tableRef, column.getPosition()).newColumnExpression())) {
- // TODO: we could check the region boundaries to see if the pk will still be in it.
- runOnServer = false; // bail on running server side, since PK may be changing
- break;
- }
- }
-
+
////////////////////////////////////////////////////////////////////
// UPSERT SELECT run server-side
/////////////////////////////////////////////////////////////////////
if (runOnServer) {
// Iterate through columns being projected
List<PColumn> projectedColumns = Lists.newArrayListWithExpectedSize(projectedExpressions.size());
- for (int i = 0; i < projectedExpressions.size(); i++) {
+ int posOff = table.getBucketNum() != null ? 1 : 0;
+ for (int i = 0 ; i < projectedExpressions.size(); i++) {
// Must make new column if position has changed
PColumn column = allColumns.get(allColumnsIndexes[i]);
- projectedColumns.add(column.getPosition() == i ? column : new PColumnImpl(column, i));
+ projectedColumns.add(column.getPosition() == i + posOff ? column : new PColumnImpl(column, i));
}
// Build table from projectedColumns
- PTable projectedTable = PTableImpl.makePTable(table, projectedColumns);
+ // Hack to add default column family to be used on server in case no value column is projected.
+ PTable projectedTable = PTableImpl.makePTable(table, projectedColumns,
+ PNameFactory.newName(SchemaUtil.getEmptyColumnFamily(table)));
+
SelectStatement select = SelectStatement.create(SelectStatement.COUNT_ONE, upsert.getHint());
StatementContext statementContext = queryPlan.getContext();
@@ -717,7 +711,7 @@ public class UpsertCompiler {
scan.setAttribute(BaseScannerRegionObserver.UPSERT_SELECT_EXPRS, UngroupedAggregateRegionObserver.serialize(projectedExpressions));
// Ignore order by - it has no impact
- final QueryPlan aggPlan = new AggregatePlan(context, select, tableRef, aggProjector, null,null, OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null);
+ final QueryPlan aggPlan = new AggregatePlan(context, select, statementContext.getCurrentTable(), aggProjector, null,null, OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null);
return new MutationPlan() {
@Override
public ParameterMetaData getParameterMetaData() {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/496f376d/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index a888bb2..db3c792 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
@@ -88,12 +89,14 @@ import org.apache.phoenix.index.PhoenixIndexCodec;
import org.apache.phoenix.join.HashJoinInfo;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.ColumnRef;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PRow;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableImpl;
import org.apache.phoenix.schema.RowKeySchema;
import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.ValueSchema.Field;
import org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker;
import org.apache.phoenix.schema.stats.StatisticsCollector;
@@ -132,8 +135,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
// TODO: move all constants into a single class
public static final String UNGROUPED_AGG = "UngroupedAgg";
public static final String DELETE_AGG = "DeleteAgg";
- public static final String UPSERT_SELECT_TABLE = "UpsertSelectTable";
- public static final String UPSERT_SELECT_EXPRS = "UpsertSelectExprs";
public static final String DELETE_CQ = "DeleteCQ";
public static final String DELETE_CF = "DeleteCF";
public static final String EMPTY_CF = "EmptyCF";
@@ -210,6 +211,40 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
logger.debug("Committing bactch of " + mutations.size() + " mutations for " + region.getRegionInfo().getTable().getNameAsString());
region.batchMutate(mutations.toArray(mutationArray), HConstants.NO_NONCE, HConstants.NO_NONCE);
}
+
+ private void commitBatchWithHTable(HTable table, Region region, List<Mutation> mutations, byte[] indexUUID,
+ long blockingMemstoreSize, byte[] indexMaintainersPtr, byte[] txState) throws IOException {
+
+ if (indexUUID != null) {
+ // Need to add indexMaintainers for each mutation as table.batch can be distributed across servers
+ for (Mutation m : mutations) {
+ if (indexMaintainersPtr != null) {
+ m.setAttribute(PhoenixIndexCodec.INDEX_MD, indexMaintainersPtr);
+ }
+ if (txState != null) {
+ m.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
+ }
+ m.setAttribute(PhoenixIndexCodec.INDEX_UUID, indexUUID);
+ }
+ }
+ // When memstore size reaches blockingMemstoreSize we are waiting 3 seconds for the
+ // flush happen which decrease the memstore size and then writes allowed on the region.
+ for (int i = 0; region.getMemstoreSize() > blockingMemstoreSize && i < 30; i++) {
+ try {
+ checkForRegionClosing();
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException(e);
+ }
+ }
+ logger.debug("Committing batch of " + mutations.size() + " mutations for " + table);
+ try {
+ table.batch(mutations);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
/**
* There is a chance that region might be closing while running balancer/move/merge. In this
@@ -308,12 +343,18 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
byte[] deleteCQ = null;
byte[] deleteCF = null;
byte[] emptyCF = null;
+ HTable targetHTable = null;
+ boolean areMutationInSameRegion = true;
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
if (upsertSelectTable != null) {
isUpsert = true;
projectedTable = deserializeTable(upsertSelectTable);
+ targetHTable = new HTable(env.getConfiguration(), projectedTable.getPhysicalName().getBytes());
selectExpressions = deserializeExpressions(scan.getAttribute(BaseScannerRegionObserver.UPSERT_SELECT_EXPRS));
values = new byte[projectedTable.getPKColumns().size()][];
+ areMutationInSameRegion = Bytes.compareTo(targetHTable.getTableName(),
+ region.getTableDesc().getTableName().getName()) == 0
+ && !isPkPositionChanging(new TableRef(projectedTable), selectExpressions);
} else {
byte[] isDeleteAgg = scan.getAttribute(BaseScannerRegionObserver.DELETE_AGG);
@@ -522,10 +563,15 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
delete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
} else if (isUpsert) {
Arrays.fill(values, null);
- int i = 0;
+ int bucketNumOffset = 0;
+ if (projectedTable.getBucketNum() != null) {
+ values[0] = new byte[] { 0 };
+ bucketNumOffset = 1;
+ }
+ int i = bucketNumOffset;
List<PColumn> projectedColumns = projectedTable.getColumns();
for (; i < projectedTable.getPKColumns().size(); i++) {
- Expression expression = selectExpressions.get(i);
+ Expression expression = selectExpressions.get(i - bucketNumOffset);
if (expression.evaluate(result, ptr)) {
values[i] = ptr.copyBytes();
// If SortOrder from expression in SELECT doesn't match the
@@ -535,12 +581,14 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
SortOrder.invert(values[i], 0, values[i], 0,
values[i].length);
}
+ }else{
+ values[i] = ByteUtil.EMPTY_BYTE_ARRAY;
}
}
projectedTable.newKey(ptr, values);
PRow row = projectedTable.newRow(kvBuilder, ts, ptr, false);
for (; i < projectedColumns.size(); i++) {
- Expression expression = selectExpressions.get(i);
+ Expression expression = selectExpressions.get(i - bucketNumOffset);
if (expression.evaluate(result, ptr)) {
PColumn column = projectedColumns.get(i);
if (!column.getDataType().isSizeCompatible(ptr, null,
@@ -605,8 +653,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
List<List<Mutation>> batchMutationList =
MutationState.getMutationBatchList(batchSize, batchSizeBytes, mutations);
for (List<Mutation> batchMutations : batchMutationList) {
- commitBatch(region, batchMutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr,
- txState);
+ commit(region, batchMutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr,
+ txState, areMutationInSameRegion, targetHTable);
batchMutations.clear();
}
mutations.clear();
@@ -624,7 +672,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
}
} while (hasMore);
if (!mutations.isEmpty()) {
- commitBatch(region, mutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr, txState);
+ commit(region, mutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr, txState,
+ areMutationInSameRegion, targetHTable);
+ mutations.clear();
}
if (!indexMutations.isEmpty()) {
@@ -638,6 +688,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
scansReferenceCount--;
}
}
+ if (targetHTable != null) {
+ targetHTable.close();
+ }
try {
innerScanner.close();
} finally {
@@ -678,8 +731,36 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
}
};
return scanner;
+
+ }
+
+ private void commit(Region region, List<Mutation> mutations, byte[] indexUUID, long blockingMemstoreSize,
+ byte[] indexMaintainersPtr, byte[] txState, boolean areMutationsForSameRegion, HTable hTable)
+ throws IOException {
+ if (!areMutationsForSameRegion) {
+ assert hTable != null;// table cannot be null
+ commitBatchWithHTable(hTable, region, mutations, indexUUID, blockingMemstoreSize, indexMaintainersPtr,
+ txState);
+ } else {
+ commitBatch(region, mutations, indexUUID, blockingMemstoreSize, indexMaintainersPtr, txState);
+ }
+ }
+
+ private boolean isPkPositionChanging(TableRef tableRef, List<Expression> projectedExpressions) throws SQLException {
+ // If the row ends up living in a different region, we'll get an error otherwise.
+ for (int i = 0; i < tableRef.getTable().getPKColumns().size(); i++) {
+ PColumn column = tableRef.getTable().getPKColumns().get(i);
+ Expression source = projectedExpressions.get(i);
+ if (source == null || !source
+ .equals(new ColumnRef(tableRef, column.getPosition()).newColumnExpression())) { return true; }
+ }
+ return false;
}
+ private boolean readyToCommit(List<Mutation> mutations,int batchSize){
+ return !mutations.isEmpty() && batchSize > 0 &&
+ mutations.size() > batchSize;
+ }
@Override
public InternalScanner preCompact(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
final InternalScanner scanner, final ScanType scanType) throws IOException {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/496f376d/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index 98a0b99..b4e0a06 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -250,6 +250,16 @@ public class PTableImpl implements PTable {
table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(),
table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema());
}
+
+ public static PTableImpl makePTable(PTable table, Collection<PColumn> columns, PName defaultFamily) throws SQLException {
+ return new PTableImpl(
+ table.getTenantId(), table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), table.getTimeStamp(),
+ table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(),
+ table.getIndexes(), table.isImmutableRows(), table.getPhysicalNames(), defaultFamily, table.getViewStatement(),
+ table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(),
+ table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(),
+ table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema());
+ }
public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, Collection<PColumn> columns) throws SQLException {
return new PTableImpl(