You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/08/20 08:30:41 UTC
[3/3] git commit: PHOENIX-1181 client cache fails to update itself
after a table was altered from a diff client
PHOENIX-1181 client cache fails to update itself after a table was altered from a diff client
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/5ca432b2
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/5ca432b2
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/5ca432b2
Branch: refs/heads/3.0
Commit: 5ca432b2dd4b51c8314a5659a10e33eb4b7693bd
Parents: 23fc482
Author: James Taylor <jt...@salesforce.com>
Authored: Tue Aug 19 23:34:31 2014 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Tue Aug 19 23:34:31 2014 -0700
----------------------------------------------------------------------
.../apache/phoenix/compile/FromCompiler.java | 9 +-
.../apache/phoenix/compile/UpsertCompiler.java | 201 ++++++++++---------
2 files changed, 103 insertions(+), 107 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5ca432b2/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
index 02d4f99..efc0973 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
@@ -177,19 +177,14 @@ public class FromCompiler {
public static ColumnResolver getResolverForMutation(DMLStatement statement, PhoenixConnection connection)
throws SQLException {
- return getResolverForMutation(statement, connection, false);
- }
-
- public static ColumnResolver getResolverForMutation(DMLStatement statement, PhoenixConnection connection, boolean updateCacheImmediately)
- throws SQLException {
/*
* We validate the meta data at commit time for mutations, as this allows us to do many UPSERT VALUES calls
* without hitting the server each time to check if the meta data is up-to-date.
*/
- SingleTableColumnResolver visitor = new SingleTableColumnResolver(connection, statement.getTable(), updateCacheImmediately);
+ SingleTableColumnResolver visitor = new SingleTableColumnResolver(connection, statement.getTable(), false);
return visitor;
}
-
+
private static class SingleTableColumnResolver extends BaseColumnResolver {
private final List<TableRef> tableRefs;
private final String alias;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5ca432b2/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index d625a9d..b8e1a6d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -37,7 +37,6 @@ import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
-import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
@@ -48,7 +47,6 @@ import org.apache.phoenix.expression.LiteralExpression;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.index.IndexMetaDataCacheClient;
import org.apache.phoenix.index.PhoenixIndexCodec;
-import org.apache.phoenix.iterate.ParallelIterators.ParallelIteratorFactory;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixResultSet;
@@ -224,28 +222,23 @@ public class UpsertCompiler {
int[] columnIndexesToBe;
int nColumnsToSet = 0;
int[] pkSlotIndexesToBe;
+ List<ParseNode> valueNodes = upsert.getValues();
List<PColumn> targetColumns;
NamedTableNode tableNode = upsert.getTable();
String tableName = tableNode.getName().getTableName();
String schemaName = tableNode.getName().getSchemaName();
- // Retry once if columns are explicitly named, as the meta data may
+ QueryPlan queryPlanToBe = null;
+ int nValuesToSet;
+ boolean sameTable = false;
+ boolean runOnServer = false;
+ UpsertingParallelIteratorFactory parallelIteratorFactoryToBe = null;
+ // Retry once if auto commit is off, as the meta data may
// be out of date. We do not retry if auto commit is on, as we
// update the cache up front when we create the resolver in that case.
- boolean retryOnce = !columnNodes.isEmpty() && !connection.getAutoCommit();
+ boolean retryOnce = !connection.getAutoCommit();
while (true) {
try {
- /*
- * Update the cache up front if we don't specify columns. This is going
- * to cause a slow down, as every statement execution will check if the
- * meta data is out-of-date. There's no way around this, though, as we
- * compile in information based on the columns we find when none are
- * specified. For example, a column could have been removed and a new
- * one added and we may be assuming the wrong type if we don't do this
- * now. There's a hole in the existing algorithm too, even if columns
- * are specified. If one was removed and then added back with the same
- * name, but a different type, we'll run into problems.
- */
- resolver = FromCompiler.getResolverForMutation(upsert, connection, columnNodes.isEmpty());
+ resolver = FromCompiler.getResolverForMutation(upsert, connection);
tableRefToBe = resolver.getTables().get(0);
table = tableRefToBe.getTable();
if (table.getType() == PTableType.VIEW) {
@@ -370,105 +363,113 @@ public class UpsertCompiler {
}
}
}
- break;
+ boolean isAutoCommit = connection.getAutoCommit();
+ if (valueNodes == null) {
+ SelectStatement select = upsert.getSelect();
+ assert(select != null);
+ select = SubselectRewriter.flatten(select, connection);
+ ColumnResolver selectResolver = FromCompiler.getResolverForQuery(select, connection);
+ select = StatementNormalizer.normalize(select, selectResolver);
+ select = prependTenantAndViewConstants(table, select, tenantId, addViewColumnsToBe);
+ sameTable = select.getFrom().size() == 1
+ && tableRefToBe.equals(selectResolver.getTables().get(0));
+ /* We can run the upsert in a coprocessor if:
+ * 1) from has only 1 table and the into table matches from table
+ * 2) the select query isn't doing aggregation (which requires a client-side final merge)
+ * 3) autoCommit is on
+ * 4) the table is not immutable with indexes, as the client is the one that figures out the additional
+ * puts for index tables.
+ * 5) no limit clause, as the limit clause requires client-side post processing
+ * 6) no sequences, as sequences imply that the order of upsert must match the order of
+ * selection.
+ * Otherwise, run the query to pull the data from the server
+ * and populate the MutationState (upto a limit).
+ */
+ if (! (select.isAggregate() || select.isDistinct() || select.getLimit() != null || select.hasSequence()) ) {
+ // We can pipeline the upsert select instead of spooling everything to disk first,
+ // if we don't have any post processing that's required.
+ parallelIteratorFactoryToBe = new UpsertingParallelIteratorFactory(connection, tableRefToBe);
+ // If we're in the else, then it's not an aggregate, distinct, limted, or sequence using query,
+ // so we might be able to run it entirely on the server side.
+ runOnServer = sameTable && isAutoCommit && !(table.isImmutableRows() && !table.getIndexes().isEmpty());
+ }
+ // If we may be able to run on the server, add a hint that favors using the data table
+ // if all else is equal.
+ // TODO: it'd be nice if we could figure out in advance if the PK is potentially changing,
+ // as this would disallow running on the server. We currently use the row projector we
+ // get back to figure this out.
+ HintNode hint = upsert.getHint();
+ if (!upsert.getHint().hasHint(Hint.USE_INDEX_OVER_DATA_TABLE)) {
+ hint = HintNode.create(hint, Hint.USE_DATA_OVER_INDEX_TABLE);
+ }
+ select = SelectStatement.create(select, hint);
+ // Pass scan through if same table in upsert and select so that projection is computed correctly
+ // Use optimizer to choose the best plan
+ try {
+ QueryCompiler compiler = new QueryCompiler(statement, select, selectResolver, targetColumns, parallelIteratorFactoryToBe, new SequenceManager(statement));
+ queryPlanToBe = compiler.compile();
+ } catch (MetaDataEntityNotFoundException e) {
+ retryOnce = false; // don't retry if select clause has meta data entities that aren't found, as we already updated the cache
+ throw e;
+ }
+ nValuesToSet = queryPlanToBe.getProjector().getColumnCount();
+ // Cannot auto commit if doing aggregation or topN or salted
+ // Salted causes problems because the row may end up living on a different region
+ } else {
+ nValuesToSet = valueNodes.size() + addViewColumnsToBe.size() + (isTenantSpecific ? 1 : 0) + (isSharedViewIndex ? 1 : 0);
+ }
+ // Resize down to allow a subset of columns to be specifiable
+ if (columnNodes.isEmpty() && columnIndexesToBe.length >= nValuesToSet) {
+ nColumnsToSet = nValuesToSet;
+ columnIndexesToBe = Arrays.copyOf(columnIndexesToBe, nValuesToSet);
+ pkSlotIndexesToBe = Arrays.copyOf(pkSlotIndexesToBe, nValuesToSet);
+ }
+
+ if (nValuesToSet != nColumnsToSet) {
+ // We might have added columns, so refresh cache and try again if stale.
+ // Note that this check is not really sufficient, as a column could have
+ // been removed and the added back and we wouldn't detect that here.
+ if (retryOnce) {
+ retryOnce = false;
+ if (new MetaDataClient(connection).updateCache(schemaName, tableName).wasUpdated()) {
+ continue;
+ }
+ }
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.UPSERT_COLUMN_NUMBERS_MISMATCH)
+ .setMessage("Numbers of columns: " + nColumnsToSet + ". Number of values: " + nValuesToSet)
+ .build().buildException();
+ }
} catch (MetaDataEntityNotFoundException e) {
// Catch column/column family not found exception, as our meta data may
// be out of sync. Update the cache once and retry if we were out of sync.
// Otherwise throw, as we'll just get the same error next time.
if (retryOnce) {
retryOnce = false;
- MetaDataMutationResult result = new MetaDataClient(connection).updateCache(schemaName, tableName);
- if (result.wasUpdated()) {
+ if (new MetaDataClient(connection).updateCache(schemaName, tableName).wasUpdated()) {
continue;
}
}
throw e;
}
- }
-
- final List<PColumn> allColumns = allColumnsToBe;
- List<ParseNode> valueNodes = upsert.getValues();
- QueryPlan plan = null;
- RowProjector rowProjectorToBe = null;
- final int nValuesToSet;
- boolean sameTable = false;
- boolean runOnServer = false;
- UpsertingParallelIteratorFactory upsertParallelIteratorFactoryToBe = null;
- final boolean isAutoCommit = connection.getAutoCommit();
- if (valueNodes == null) {
- SelectStatement select = upsert.getSelect();
- assert(select != null);
- select = SubselectRewriter.flatten(select, connection);
- ColumnResolver selectResolver = FromCompiler.getResolverForQuery(select, connection);
- select = StatementNormalizer.normalize(select, selectResolver);
- select = prependTenantAndViewConstants(table, select, tenantId, addViewColumnsToBe);
- sameTable = select.getFrom().size() == 1
- && tableRefToBe.equals(selectResolver.getTables().get(0));
- /* We can run the upsert in a coprocessor if:
- * 1) from has only 1 table and the into table matches from table
- * 2) the select query isn't doing aggregation (which requires a client-side final merge)
- * 3) autoCommit is on
- * 4) the table is not immutable with indexes, as the client is the one that figures out the additional
- * puts for index tables.
- * 5) no limit clause, as the limit clause requires client-side post processing
- * 6) no sequences, as sequences imply that the order of upsert must match the order of
- * selection.
- * Otherwise, run the query to pull the data from the server
- * and populate the MutationState (upto a limit).
- */
- ParallelIteratorFactory parallelIteratorFactory;
- if (select.isAggregate() || select.isDistinct() || select.getLimit() != null || select.hasSequence()) {
- parallelIteratorFactory = null;
- } else {
- // We can pipeline the upsert select instead of spooling everything to disk first,
- // if we don't have any post processing that's required.
- parallelIteratorFactory = upsertParallelIteratorFactoryToBe = new UpsertingParallelIteratorFactory(connection, tableRefToBe);
- // If we're in the else, then it's not an aggregate, distinct, limted, or sequence using query,
- // so we might be able to run it entirely on the server side.
- runOnServer = sameTable && isAutoCommit && !(table.isImmutableRows() && !table.getIndexes().isEmpty());
- }
- // If we may be able to run on the server, add a hint that favors using the data table
- // if all else is equal.
- // TODO: it'd be nice if we could figure out in advance if the PK is potentially changing,
- // as this would disallow running on the server. We currently use the row projector we
- // get back to figure this out.
- HintNode hint = upsert.getHint();
- if (!upsert.getHint().hasHint(Hint.USE_INDEX_OVER_DATA_TABLE)) {
- hint = HintNode.create(hint, Hint.USE_DATA_OVER_INDEX_TABLE);
- }
- select = SelectStatement.create(select, hint);
- // Pass scan through if same table in upsert and select so that projection is computed correctly
- // Use optimizer to choose the best plan
- plan = new QueryOptimizer(services).optimize(statement, select, selectResolver, targetColumns, parallelIteratorFactory);
- runOnServer &= plan.getTableRef().equals(tableRefToBe);
- rowProjectorToBe = plan.getProjector();
- nValuesToSet = rowProjectorToBe.getColumnCount();
- // Cannot auto commit if doing aggregation or topN or salted
- // Salted causes problems because the row may end up living on a different region
- } else {
- nValuesToSet = valueNodes.size() + addViewColumnsToBe.size() + (isTenantSpecific ? 1 : 0) + (isSharedViewIndex ? 1 : 0);
- }
- final RowProjector projector = rowProjectorToBe;
- final UpsertingParallelIteratorFactory upsertParallelIteratorFactory = upsertParallelIteratorFactoryToBe;
- final QueryPlan queryPlan = plan;
- // Resize down to allow a subset of columns to be specifiable
- if (columnNodes.isEmpty() && columnIndexesToBe.length >= nValuesToSet) {
- nColumnsToSet = nValuesToSet;
- columnIndexesToBe = Arrays.copyOf(columnIndexesToBe, nValuesToSet);
- pkSlotIndexesToBe = Arrays.copyOf(pkSlotIndexesToBe, nValuesToSet);
+ break;
}
- if (nValuesToSet != nColumnsToSet) {
- throw new SQLExceptionInfo.Builder(SQLExceptionCode.UPSERT_COLUMN_NUMBERS_MISMATCH)
- .setMessage("Numbers of columns: " + nColumnsToSet + ". Number of values: " + nValuesToSet)
- .build().buildException();
+ RowProjector projectorToBe = null;
+ // Optimize only after all checks have been performed
+ if (valueNodes == null) {
+ queryPlanToBe = new QueryOptimizer(services).optimize(queryPlanToBe, statement, targetColumns, parallelIteratorFactoryToBe);
+ projectorToBe = queryPlanToBe.getProjector();
+ runOnServer &= queryPlanToBe.getTableRef().equals(tableRefToBe);
}
-
+ final List<PColumn> allColumns = allColumnsToBe;
+ final RowProjector projector = projectorToBe;
+ final QueryPlan queryPlan = queryPlanToBe;
final TableRef tableRef = tableRefToBe;
final int[] columnIndexes = columnIndexesToBe;
final int[] pkSlotIndexes = pkSlotIndexesToBe;
final Set<PColumn> addViewColumns = addViewColumnsToBe;
final Set<PColumn> overlapViewColumns = overlapViewColumnsToBe;
+ final UpsertingParallelIteratorFactory parallelIteratorFactory = parallelIteratorFactoryToBe;
// TODO: break this up into multiple functions
////////////////////////////////////////////////////////////////////
@@ -648,12 +649,12 @@ public class UpsertCompiler {
@Override
public MutationState execute() throws SQLException {
ResultIterator iterator = queryPlan.iterator();
- if (upsertParallelIteratorFactory == null) {
+ if (parallelIteratorFactory == null) {
return upsertSelect(statement, tableRef, projector, iterator, columnIndexes, pkSlotIndexes);
}
- upsertParallelIteratorFactory.setRowProjector(projector);
- upsertParallelIteratorFactory.setColumnIndexes(columnIndexes);
- upsertParallelIteratorFactory.setPkSlotIndexes(pkSlotIndexes);
+ parallelIteratorFactory.setRowProjector(projector);
+ parallelIteratorFactory.setColumnIndexes(columnIndexes);
+ parallelIteratorFactory.setPkSlotIndexes(pkSlotIndexes);
Tuple tuple;
long totalRowCount = 0;
while ((tuple=iterator.next()) != null) {// Runs query