You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/08/20 08:30:41 UTC

[3/3] git commit: PHOENIX-1181 client cache fails to update itself after a table was altered from a diff client

PHOENIX-1181 client cache fails to update itself after a table was altered from a diff client


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/5ca432b2
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/5ca432b2
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/5ca432b2

Branch: refs/heads/3.0
Commit: 5ca432b2dd4b51c8314a5659a10e33eb4b7693bd
Parents: 23fc482
Author: James Taylor <jt...@salesforce.com>
Authored: Tue Aug 19 23:34:31 2014 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Tue Aug 19 23:34:31 2014 -0700

----------------------------------------------------------------------
 .../apache/phoenix/compile/FromCompiler.java    |   9 +-
 .../apache/phoenix/compile/UpsertCompiler.java  | 201 ++++++++++---------
 2 files changed, 103 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/5ca432b2/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
index 02d4f99..efc0973 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
@@ -177,19 +177,14 @@ public class FromCompiler {
 
     public static ColumnResolver getResolverForMutation(DMLStatement statement, PhoenixConnection connection)
             throws SQLException {
-        return getResolverForMutation(statement, connection, false);
-    }
-    
-    public static ColumnResolver getResolverForMutation(DMLStatement statement, PhoenixConnection connection, boolean updateCacheImmediately)
-            throws SQLException {
         /*
          * We validate the meta data at commit time for mutations, as this allows us to do many UPSERT VALUES calls
          * without hitting the server each time to check if the meta data is up-to-date.
          */
-        SingleTableColumnResolver visitor = new SingleTableColumnResolver(connection, statement.getTable(), updateCacheImmediately);
+        SingleTableColumnResolver visitor = new SingleTableColumnResolver(connection, statement.getTable(), false);
         return visitor;
     }
-
+    
     private static class SingleTableColumnResolver extends BaseColumnResolver {
         	private final List<TableRef> tableRefs;
         	private final String alias;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5ca432b2/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index d625a9d..b8e1a6d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -37,7 +37,6 @@ import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
-import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
 import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
@@ -48,7 +47,6 @@ import org.apache.phoenix.expression.LiteralExpression;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.index.IndexMetaDataCacheClient;
 import org.apache.phoenix.index.PhoenixIndexCodec;
-import org.apache.phoenix.iterate.ParallelIterators.ParallelIteratorFactory;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixResultSet;
@@ -224,28 +222,23 @@ public class UpsertCompiler {
         int[] columnIndexesToBe;
         int nColumnsToSet = 0;
         int[] pkSlotIndexesToBe;
+        List<ParseNode> valueNodes = upsert.getValues();
         List<PColumn> targetColumns;
         NamedTableNode tableNode = upsert.getTable();
         String tableName = tableNode.getName().getTableName();
         String schemaName = tableNode.getName().getSchemaName();
-        // Retry once if columns are explicitly named, as the meta data may
+        QueryPlan queryPlanToBe = null;
+        int nValuesToSet;
+        boolean sameTable = false;
+        boolean runOnServer = false;
+        UpsertingParallelIteratorFactory parallelIteratorFactoryToBe = null;
+        // Retry once if auto commit is off, as the meta data may
         // be out of date. We do not retry if auto commit is on, as we
         // update the cache up front when we create the resolver in that case.
-        boolean retryOnce = !columnNodes.isEmpty() && !connection.getAutoCommit();
+        boolean retryOnce = !connection.getAutoCommit();
         while (true) {
             try {
-                /*
-                 * Update the cache up front if we don't specify columns. This is going
-                 * to cause a slow down, as every statement execution will check if the
-                 * meta data is out-of-date. There's no way around this, though, as we
-                 * compile in information based on the columns we find when none are
-                 * specified. For example, a column could have been removed and a new
-                 * one added and we may be assuming the wrong type if we don't do this
-                 * now. There's a hole in the existing algorithm too, even if columns
-                 * are specified. If one was removed and then added back with the same
-                 * name, but a different type, we'll run into problems.
-                 */
-                resolver = FromCompiler.getResolverForMutation(upsert, connection, columnNodes.isEmpty());
+                resolver = FromCompiler.getResolverForMutation(upsert, connection);
                 tableRefToBe = resolver.getTables().get(0);
                 table = tableRefToBe.getTable();
                 if (table.getType() == PTableType.VIEW) {
@@ -370,105 +363,113 @@ public class UpsertCompiler {
                         }
                     }
                 }
-                break;
+                boolean isAutoCommit = connection.getAutoCommit();
+                if (valueNodes == null) {
+                    SelectStatement select = upsert.getSelect();
+                    assert(select != null);
+                    select = SubselectRewriter.flatten(select, connection);
+                    ColumnResolver selectResolver = FromCompiler.getResolverForQuery(select, connection);
+                    select = StatementNormalizer.normalize(select, selectResolver);
+                    select = prependTenantAndViewConstants(table, select, tenantId, addViewColumnsToBe);
+                    sameTable = select.getFrom().size() == 1
+                        && tableRefToBe.equals(selectResolver.getTables().get(0));
+                    /* We can run the upsert in a coprocessor if:
+                     * 1) from has only 1 table and the into table matches from table
+                     * 2) the select query isn't doing aggregation (which requires a client-side final merge)
+                     * 3) autoCommit is on
+                     * 4) the table is not immutable with indexes, as the client is the one that figures out the additional
+                     *    puts for index tables.
+                     * 5) no limit clause, as the limit clause requires client-side post processing
+                     * 6) no sequences, as sequences imply that the order of upsert must match the order of
+                     *    selection.
+                     * Otherwise, run the query to pull the data from the server
+                     * and populate the MutationState (upto a limit).
+                    */            
+                    if (! (select.isAggregate() || select.isDistinct() || select.getLimit() != null || select.hasSequence()) ) {
+                        // We can pipeline the upsert select instead of spooling everything to disk first,
+                        // if we don't have any post processing that's required.
+                        parallelIteratorFactoryToBe = new UpsertingParallelIteratorFactory(connection, tableRefToBe);
+                        // If we're in the else, then it's not an aggregate, distinct, limted, or sequence using query,
+                        // so we might be able to run it entirely on the server side.
+                        runOnServer = sameTable && isAutoCommit && !(table.isImmutableRows() && !table.getIndexes().isEmpty());
+                    }
+                    // If we may be able to run on the server, add a hint that favors using the data table
+                    // if all else is equal.
+                    // TODO: it'd be nice if we could figure out in advance if the PK is potentially changing,
+                    // as this would disallow running on the server. We currently use the row projector we
+                    // get back to figure this out.
+                    HintNode hint = upsert.getHint();
+                    if (!upsert.getHint().hasHint(Hint.USE_INDEX_OVER_DATA_TABLE)) {
+                        hint = HintNode.create(hint, Hint.USE_DATA_OVER_INDEX_TABLE);
+                    }
+                    select = SelectStatement.create(select, hint);
+                    // Pass scan through if same table in upsert and select so that projection is computed correctly
+                    // Use optimizer to choose the best plan
+                    try {
+                        QueryCompiler compiler = new QueryCompiler(statement, select, selectResolver, targetColumns, parallelIteratorFactoryToBe, new SequenceManager(statement));
+                        queryPlanToBe = compiler.compile();
+                    } catch (MetaDataEntityNotFoundException e) {
+                        retryOnce = false; // don't retry if select clause has meta data entities that aren't found, as we already updated the cache
+                        throw e;
+                    }
+                    nValuesToSet = queryPlanToBe.getProjector().getColumnCount();
+                    // Cannot auto commit if doing aggregation or topN or salted
+                    // Salted causes problems because the row may end up living on a different region
+                } else {
+                    nValuesToSet = valueNodes.size() + addViewColumnsToBe.size() + (isTenantSpecific ? 1 : 0) + (isSharedViewIndex ? 1 : 0);
+                }
+                // Resize down to allow a subset of columns to be specifiable
+                if (columnNodes.isEmpty() && columnIndexesToBe.length >= nValuesToSet) {
+                    nColumnsToSet = nValuesToSet;
+                    columnIndexesToBe = Arrays.copyOf(columnIndexesToBe, nValuesToSet);
+                    pkSlotIndexesToBe = Arrays.copyOf(pkSlotIndexesToBe, nValuesToSet);
+                }
+                
+                if (nValuesToSet != nColumnsToSet) {
+                    // We might have added columns, so refresh cache and try again if stale.
+                    // Note that this check is not really sufficient, as a column could have
+                    // been removed and the added back and we wouldn't detect that here.
+                    if (retryOnce) {
+                        retryOnce = false;
+                        if (new MetaDataClient(connection).updateCache(schemaName, tableName).wasUpdated()) {
+                            continue;
+                        }
+                    }
+                    throw new SQLExceptionInfo.Builder(SQLExceptionCode.UPSERT_COLUMN_NUMBERS_MISMATCH)
+                        .setMessage("Numbers of columns: " + nColumnsToSet + ". Number of values: " + nValuesToSet)
+                        .build().buildException();
+                }
             } catch (MetaDataEntityNotFoundException e) {
                 // Catch column/column family not found exception, as our meta data may
                 // be out of sync. Update the cache once and retry if we were out of sync.
                 // Otherwise throw, as we'll just get the same error next time.
                 if (retryOnce) {
                     retryOnce = false;
-                    MetaDataMutationResult result = new MetaDataClient(connection).updateCache(schemaName, tableName);
-                    if (result.wasUpdated()) {
+                    if (new MetaDataClient(connection).updateCache(schemaName, tableName).wasUpdated()) {
                         continue;
                     }
                 }
                 throw e;
             }
-       }
-        
-        final List<PColumn> allColumns = allColumnsToBe;
-        List<ParseNode> valueNodes = upsert.getValues();
-        QueryPlan plan = null;
-        RowProjector rowProjectorToBe = null;
-        final int nValuesToSet;
-        boolean sameTable = false;
-        boolean runOnServer = false;
-        UpsertingParallelIteratorFactory upsertParallelIteratorFactoryToBe = null;
-        final boolean isAutoCommit = connection.getAutoCommit();
-        if (valueNodes == null) {
-            SelectStatement select = upsert.getSelect();
-            assert(select != null);
-            select = SubselectRewriter.flatten(select, connection);
-            ColumnResolver selectResolver = FromCompiler.getResolverForQuery(select, connection);
-            select = StatementNormalizer.normalize(select, selectResolver);
-            select = prependTenantAndViewConstants(table, select, tenantId, addViewColumnsToBe);
-            sameTable = select.getFrom().size() == 1
-                && tableRefToBe.equals(selectResolver.getTables().get(0));
-            /* We can run the upsert in a coprocessor if:
-             * 1) from has only 1 table and the into table matches from table
-             * 2) the select query isn't doing aggregation (which requires a client-side final merge)
-             * 3) autoCommit is on
-             * 4) the table is not immutable with indexes, as the client is the one that figures out the additional
-             *    puts for index tables.
-             * 5) no limit clause, as the limit clause requires client-side post processing
-             * 6) no sequences, as sequences imply that the order of upsert must match the order of
-             *    selection.
-             * Otherwise, run the query to pull the data from the server
-             * and populate the MutationState (upto a limit).
-            */            
-            ParallelIteratorFactory parallelIteratorFactory;
-            if (select.isAggregate() || select.isDistinct() || select.getLimit() != null || select.hasSequence()) {
-                parallelIteratorFactory = null;
-            } else {
-                // We can pipeline the upsert select instead of spooling everything to disk first,
-                // if we don't have any post processing that's required.
-                parallelIteratorFactory = upsertParallelIteratorFactoryToBe = new UpsertingParallelIteratorFactory(connection, tableRefToBe);
-                // If we're in the else, then it's not an aggregate, distinct, limted, or sequence using query,
-                // so we might be able to run it entirely on the server side.
-                runOnServer = sameTable && isAutoCommit && !(table.isImmutableRows() && !table.getIndexes().isEmpty());
-            }
-            // If we may be able to run on the server, add a hint that favors using the data table
-            // if all else is equal.
-            // TODO: it'd be nice if we could figure out in advance if the PK is potentially changing,
-            // as this would disallow running on the server. We currently use the row projector we
-            // get back to figure this out.
-            HintNode hint = upsert.getHint();
-            if (!upsert.getHint().hasHint(Hint.USE_INDEX_OVER_DATA_TABLE)) {
-                hint = HintNode.create(hint, Hint.USE_DATA_OVER_INDEX_TABLE);
-            }
-            select = SelectStatement.create(select, hint);
-            // Pass scan through if same table in upsert and select so that projection is computed correctly
-            // Use optimizer to choose the best plan 
-            plan = new QueryOptimizer(services).optimize(statement, select, selectResolver, targetColumns, parallelIteratorFactory);
-            runOnServer &= plan.getTableRef().equals(tableRefToBe);
-            rowProjectorToBe = plan.getProjector();
-            nValuesToSet = rowProjectorToBe.getColumnCount();
-            // Cannot auto commit if doing aggregation or topN or salted
-            // Salted causes problems because the row may end up living on a different region
-        } else {
-            nValuesToSet = valueNodes.size() + addViewColumnsToBe.size() + (isTenantSpecific ? 1 : 0) + (isSharedViewIndex ? 1 : 0);
-        }
-        final RowProjector projector = rowProjectorToBe;
-        final UpsertingParallelIteratorFactory upsertParallelIteratorFactory = upsertParallelIteratorFactoryToBe;
-        final QueryPlan queryPlan = plan;
-        // Resize down to allow a subset of columns to be specifiable
-        if (columnNodes.isEmpty() && columnIndexesToBe.length >= nValuesToSet) {
-            nColumnsToSet = nValuesToSet;
-            columnIndexesToBe = Arrays.copyOf(columnIndexesToBe, nValuesToSet);
-            pkSlotIndexesToBe = Arrays.copyOf(pkSlotIndexesToBe, nValuesToSet);
+            break;
         }
         
-        if (nValuesToSet != nColumnsToSet) {
-            throw new SQLExceptionInfo.Builder(SQLExceptionCode.UPSERT_COLUMN_NUMBERS_MISMATCH)
-                .setMessage("Numbers of columns: " + nColumnsToSet + ". Number of values: " + nValuesToSet)
-                .build().buildException();
+        RowProjector projectorToBe = null;
+        // Optimize only after all checks have been performed
+        if (valueNodes == null) {
+            queryPlanToBe = new QueryOptimizer(services).optimize(queryPlanToBe, statement, targetColumns, parallelIteratorFactoryToBe);
+            projectorToBe = queryPlanToBe.getProjector();
+            runOnServer &= queryPlanToBe.getTableRef().equals(tableRefToBe);
         }
-        
+        final List<PColumn> allColumns = allColumnsToBe;
+        final RowProjector projector = projectorToBe;
+        final QueryPlan queryPlan = queryPlanToBe;
         final TableRef tableRef = tableRefToBe;
         final int[] columnIndexes = columnIndexesToBe;
         final int[] pkSlotIndexes = pkSlotIndexesToBe;
         final Set<PColumn> addViewColumns = addViewColumnsToBe;
         final Set<PColumn> overlapViewColumns = overlapViewColumnsToBe;
+        final UpsertingParallelIteratorFactory parallelIteratorFactory = parallelIteratorFactoryToBe;
         
         // TODO: break this up into multiple functions
         ////////////////////////////////////////////////////////////////////
@@ -648,12 +649,12 @@ public class UpsertCompiler {
                 @Override
                 public MutationState execute() throws SQLException {
                     ResultIterator iterator = queryPlan.iterator();
-                    if (upsertParallelIteratorFactory == null) {
+                    if (parallelIteratorFactory == null) {
                         return upsertSelect(statement, tableRef, projector, iterator, columnIndexes, pkSlotIndexes);
                     }
-                    upsertParallelIteratorFactory.setRowProjector(projector);
-                    upsertParallelIteratorFactory.setColumnIndexes(columnIndexes);
-                    upsertParallelIteratorFactory.setPkSlotIndexes(pkSlotIndexes);
+                    parallelIteratorFactory.setRowProjector(projector);
+                    parallelIteratorFactory.setColumnIndexes(columnIndexes);
+                    parallelIteratorFactory.setPkSlotIndexes(pkSlotIndexes);
                     Tuple tuple;
                     long totalRowCount = 0;
                     while ((tuple=iterator.next()) != null) {// Runs query