You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/08/20 08:30:39 UTC

[1/3] git commit: PHOENIX-1181 client cache fails to update itself after a table was altered from a diff client

Repository: phoenix
Updated Branches:
  refs/heads/3.0 91e5d3a1e -> 5ca432b2d


PHOENIX-1181 client cache fails to update itself after a table was altered from a diff client


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/42465628
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/42465628
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/42465628

Branch: refs/heads/3.0
Commit: 42465628683aa933805e3810fe1a299167344337
Parents: 7a61571
Author: James Taylor <jt...@salesforce.com>
Authored: Tue Aug 19 00:49:22 2014 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Tue Aug 19 02:35:59 2014 -0700

----------------------------------------------------------------------
 .../apache/phoenix/end2end/AlterTableIT.java    |  31 +-
 .../phoenix/compile/CreateIndexCompiler.java    |   2 +-
 .../apache/phoenix/compile/DeleteCompiler.java  |  93 ++++--
 .../apache/phoenix/compile/FromCompiler.java    |  19 +-
 .../apache/phoenix/compile/UpsertCompiler.java  | 287 +++++++++++--------
 .../apache/phoenix/execute/MutationState.java   |   7 +-
 .../org/apache/phoenix/parse/DMLStatement.java  |  27 ++
 .../apache/phoenix/parse/DeleteStatement.java   |   2 +-
 .../apache/phoenix/parse/UpsertStatement.java   |   2 +-
 .../apache/phoenix/schema/MetaDataClient.java   |   8 +-
 10 files changed, 317 insertions(+), 161 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/42465628/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
index 2af697b..ecdee66 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
@@ -837,4 +837,33 @@ public class AlterTableIT extends BaseHBaseManagedTimeIT {
         }
     }
    
- }
+    @Test
+    public void alterTableFromDifferentClient() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn3 = DriverManager.getConnection(getUrl(), props);
+
+        // here we insert into the orig schema with one column
+        Connection conn1 = DriverManager.getConnection(getUrl(), props);
+        conn1.createStatement().execute("create table test_simpletable(id VARCHAR PRIMARY KEY, field1 BIGINT)");
+        PreparedStatement stmtInsert1 = conn1.prepareStatement("upsert into test_simpletable (id, field1) values ( ?, ?)");
+        stmtInsert1.setString(1, "key1");
+        stmtInsert1.setLong(2, 1L);
+        stmtInsert1.execute();
+        conn1.commit();
+        stmtInsert1.close();
+        conn1.close();
+
+        // Do the alter through a separate client.
+        conn3.createStatement().execute("alter table test_simpletable add field2 BIGINT");
+        
+        //Connection conn1 = DriverManager.getConnection(getUrl(), props);
+        PreparedStatement pstmt2 = conn1.prepareStatement("upsert into test_simpletable (id, field1, field2) values ( ?, ?, ?)");
+        pstmt2.setString(1, "key2");
+        pstmt2.setLong(2, 2L);
+        pstmt2.setLong(3, 2L);
+        pstmt2.execute();
+        conn1.commit();
+        pstmt2.close();
+        conn1.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42465628/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java
index bbd7154..d6595ff 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java
@@ -42,7 +42,7 @@ public class CreateIndexCompiler {
 
     public MutationPlan compile(final CreateIndexStatement create) throws SQLException {
         final PhoenixConnection connection = statement.getConnection();
-        final ColumnResolver resolver = FromCompiler.getResolverForMutation(create, connection);
+        final ColumnResolver resolver = FromCompiler.getResolver(create, connection);
         Scan scan = new Scan();
         final StatementContext context = new StatementContext(statement, resolver, scan, new SequenceManager(statement));
         ExpressionCompiler expressionCompiler = new ExpressionCompiler(context);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42465628/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index 002f7b8..469bb30 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -33,6 +33,7 @@ import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.AggregatePlan;
@@ -50,6 +51,7 @@ import org.apache.phoenix.parse.AliasedNode;
 import org.apache.phoenix.parse.DeleteStatement;
 import org.apache.phoenix.parse.HintNode;
 import org.apache.phoenix.parse.HintNode.Hint;
+import org.apache.phoenix.parse.NamedTableNode;
 import org.apache.phoenix.parse.ParseNode;
 import org.apache.phoenix.parse.ParseNodeFactory;
 import org.apache.phoenix.parse.SelectStatement;
@@ -58,6 +60,8 @@ import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.MetaDataClient;
+import org.apache.phoenix.schema.MetaDataEntityNotFoundException;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PDataType;
 import org.apache.phoenix.schema.PRow;
@@ -179,39 +183,68 @@ public class DeleteCompiler {
     public MutationPlan compile(DeleteStatement delete) throws SQLException {
         final PhoenixConnection connection = statement.getConnection();
         final boolean isAutoCommit = connection.getAutoCommit();
+        final boolean hasLimit = delete.getLimit() != null;
         final ConnectionQueryServices services = connection.getQueryServices();
-        final ColumnResolver resolver = FromCompiler.getResolverForMutation(delete, connection);
-        final TableRef tableRef = resolver.getTables().get(0);
-        final PTable table = tableRef.getTable();
-        if (table.getType() == PTableType.VIEW && table.getViewType().isReadOnly()) {
-            throw new ReadOnlyTableException(table.getSchemaName().getString(),table.getTableName().getString());
-        }
+        QueryPlan planToBe = null;
+        NamedTableNode tableNode = delete.getTable();
+        String tableName = tableNode.getName().getTableName();
+        String schemaName = tableNode.getName().getSchemaName();
+        boolean retryOnce = !isAutoCommit;
+        TableRef tableRefToBe;
+        boolean noQueryReqd = false;
+        boolean runOnServer = false;
+        SelectStatement select = null;
+        DeletingParallelIteratorFactory parallelIteratorFactory = null;
+        while (true) {
+            try {
+                ColumnResolver resolver = FromCompiler.getResolverForMutation(delete, connection);
+                tableRefToBe = resolver.getTables().get(0);
+                PTable table = tableRefToBe.getTable();
+                if (table.getType() == PTableType.VIEW && table.getViewType().isReadOnly()) {
+                    throw new ReadOnlyTableException(table.getSchemaName().getString(),table.getTableName().getString());
+                }
+                
+                noQueryReqd = !hasLimit && !hasImmutableIndex(tableRefToBe);
+                runOnServer = isAutoCommit && noQueryReqd;
+                HintNode hint = delete.getHint();
+                if (runOnServer && !delete.getHint().hasHint(Hint.USE_INDEX_OVER_DATA_TABLE)) {
+                    hint = HintNode.create(hint, Hint.USE_DATA_OVER_INDEX_TABLE);
+                }
         
-        final boolean hasLimit = delete.getLimit() != null;
-        boolean noQueryReqd = !hasLimit && !hasImmutableIndex(tableRef);
-        boolean runOnServer = isAutoCommit && noQueryReqd;
-        HintNode hint = delete.getHint();
-        if (runOnServer && !delete.getHint().hasHint(Hint.USE_INDEX_OVER_DATA_TABLE)) {
-            hint = HintNode.create(hint, Hint.USE_DATA_OVER_INDEX_TABLE);
-        }
-
-        List<AliasedNode> aliasedNodes = Lists.newArrayListWithExpectedSize(table.getPKColumns().size());
-        boolean isSalted = table.getBucketNum() != null;
-        boolean isMultiTenant = connection.getTenantId() != null && table.isMultiTenant();
-        boolean isSharedViewIndex = table.getViewIndexId() != null;
-        for (int i = (isSalted ? 1 : 0) + (isMultiTenant ? 1 : 0) + (isSharedViewIndex ? 1 : 0); i < table.getPKColumns().size(); i++) {
-            PColumn column = table.getPKColumns().get(i);
-            aliasedNodes.add(FACTORY.aliasedNode(null, FACTORY.column(null, '"' + column.getName().getString() + '"', null)));
+                List<AliasedNode> aliasedNodes = Lists.newArrayListWithExpectedSize(table.getPKColumns().size());
+                boolean isSalted = table.getBucketNum() != null;
+                boolean isMultiTenant = connection.getTenantId() != null && table.isMultiTenant();
+                boolean isSharedViewIndex = table.getViewIndexId() != null;
+                for (int i = (isSalted ? 1 : 0) + (isMultiTenant ? 1 : 0) + (isSharedViewIndex ? 1 : 0); i < table.getPKColumns().size(); i++) {
+                    PColumn column = table.getPKColumns().get(i);
+                    aliasedNodes.add(FACTORY.aliasedNode(null, FACTORY.column(null, '"' + column.getName().getString() + '"', null)));
+                }
+                select = FACTORY.select(
+                        Collections.singletonList(delete.getTable()), 
+                        hint, false, aliasedNodes, delete.getWhere(), 
+                        Collections.<ParseNode>emptyList(), null, 
+                        delete.getOrderBy(), delete.getLimit(),
+                        delete.getBindCount(), false, false);
+                select = StatementNormalizer.normalize(select, resolver);
+                parallelIteratorFactory = hasLimit ? null : new DeletingParallelIteratorFactory(connection, tableRefToBe);
+                planToBe = new QueryOptimizer(services).optimize(statement, select, resolver, Collections.<PColumn>emptyList(), parallelIteratorFactory);
+            } catch (MetaDataEntityNotFoundException e) {
+                // Catch column/column family not found exception, as our meta data may
+                // be out of sync. Update the cache once and retry if we were out of sync.
+                // Otherwise throw, as we'll just get the same error next time.
+                if (retryOnce) {
+                    retryOnce = false;
+                    MetaDataMutationResult result = new MetaDataClient(connection).updateCache(schemaName, tableName);
+                    if (result.wasUpdated()) {
+                        continue;
+                    }
+                }
+                throw e;
+            }
+            break;
         }
-        SelectStatement select = FACTORY.select(
-                Collections.singletonList(delete.getTable()), 
-                hint, false, aliasedNodes, delete.getWhere(), 
-                Collections.<ParseNode>emptyList(), null, 
-                delete.getOrderBy(), delete.getLimit(),
-                delete.getBindCount(), false, false);
-        select = StatementNormalizer.normalize(select, resolver);
-        DeletingParallelIteratorFactory parallelIteratorFactory = hasLimit ? null : new DeletingParallelIteratorFactory(connection, tableRef);
-        final QueryPlan plan = new QueryOptimizer(services).optimize(statement, select, resolver, Collections.<PColumn>emptyList(), parallelIteratorFactory);
+        final TableRef tableRef = tableRefToBe;
+        final QueryPlan plan = planToBe;
         if (!plan.getTableRef().equals(tableRef)) {
             runOnServer = false;
             noQueryReqd = false;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42465628/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
index b9e0949..02d4f99 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
@@ -33,6 +33,7 @@ import org.apache.phoenix.parse.AliasedNode;
 import org.apache.phoenix.parse.BindTableNode;
 import org.apache.phoenix.parse.ColumnDef;
 import org.apache.phoenix.parse.CreateTableStatement;
+import org.apache.phoenix.parse.DMLStatement;
 import org.apache.phoenix.parse.DerivedTableNode;
 import org.apache.phoenix.parse.FamilyWildcardParseNode;
 import org.apache.phoenix.parse.JoinTableNode;
@@ -168,12 +169,26 @@ public class FromCompiler {
         return visitor;
     }
     
-    public static ColumnResolver getResolverForMutation(SingleTableStatement statement, PhoenixConnection connection)
+    public static ColumnResolver getResolver(SingleTableStatement statement, PhoenixConnection connection)
             throws SQLException {
-        SingleTableColumnResolver visitor = new SingleTableColumnResolver(connection, statement.getTable(), false);
+        SingleTableColumnResolver visitor = new SingleTableColumnResolver(connection, statement.getTable(), true);
         return visitor;
     }
+
+    public static ColumnResolver getResolverForMutation(DMLStatement statement, PhoenixConnection connection)
+            throws SQLException {
+        return getResolverForMutation(statement, connection, false);
+    }
     
+    public static ColumnResolver getResolverForMutation(DMLStatement statement, PhoenixConnection connection, boolean updateCacheImmediately)
+            throws SQLException {
+        /*
+         * We validate the meta data at commit time for mutations, as this allows us to do many UPSERT VALUES calls
+         * without hitting the server each time to check if the meta data is up-to-date.
+         */
+        SingleTableColumnResolver visitor = new SingleTableColumnResolver(connection, statement.getTable(), updateCacheImmediately);
+        return visitor;
+    }
 
     private static class SingleTableColumnResolver extends BaseColumnResolver {
         	private final List<TableRef> tableRefs;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42465628/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 52cf7d3..d625a9d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -37,6 +37,7 @@ import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
 import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
@@ -59,6 +60,7 @@ import org.apache.phoenix.parse.ColumnName;
 import org.apache.phoenix.parse.HintNode;
 import org.apache.phoenix.parse.HintNode.Hint;
 import org.apache.phoenix.parse.LiteralParseNode;
+import org.apache.phoenix.parse.NamedTableNode;
 import org.apache.phoenix.parse.ParseNode;
 import org.apache.phoenix.parse.SelectStatement;
 import org.apache.phoenix.parse.SequenceValueParseNode;
@@ -68,6 +70,8 @@ import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.ColumnRef;
 import org.apache.phoenix.schema.ConstraintViolationException;
+import org.apache.phoenix.schema.MetaDataClient;
+import org.apache.phoenix.schema.MetaDataEntityNotFoundException;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PColumnImpl;
 import org.apache.phoenix.schema.PDataType;
@@ -207,138 +211,182 @@ public class UpsertCompiler {
         final PhoenixConnection connection = statement.getConnection();
         ConnectionQueryServices services = connection.getQueryServices();
         final int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
-        final ColumnResolver resolver = FromCompiler.getResolverForMutation(upsert, connection);
-        final TableRef tableRef = resolver.getTables().get(0);
-        final PTable table = tableRef.getTable();
-        if (table.getType() == PTableType.VIEW) {
-            if (table.getViewType().isReadOnly()) {
-                throw new ReadOnlyTableException(table.getSchemaName().getString(),table.getTableName().getString());
-            }
-        }
-        boolean isSalted = table.getBucketNum() != null;
-        final boolean isTenantSpecific = table.isMultiTenant() && connection.getTenantId() != null;
-        final boolean isSharedViewIndex = table.getViewIndexId() != null;
-        String tenantId = isTenantSpecific ? connection.getTenantId().getString() : null;
-        int posOffset = isSalted ? 1 : 0;
-        // Setup array of column indexes parallel to values that are going to be set
         List<ColumnName> columnNodes = upsert.getColumns();
-        final List<PColumn> allColumns = table.getColumns();
+        TableRef tableRefToBe = null;
+        PTable table = null;
         Set<PColumn> addViewColumnsToBe = Collections.emptySet();
         Set<PColumn> overlapViewColumnsToBe = Collections.emptySet();
-
+        List<PColumn> allColumnsToBe = Collections.emptyList();
+        boolean isTenantSpecific = false;
+        boolean isSharedViewIndex = false;
+        String tenantId = null;
+        ColumnResolver resolver = null;
         int[] columnIndexesToBe;
         int nColumnsToSet = 0;
         int[] pkSlotIndexesToBe;
         List<PColumn> targetColumns;
-        if (table.getViewType() == ViewType.UPDATABLE) {
-            addViewColumnsToBe = Sets.newLinkedHashSetWithExpectedSize(allColumns.size());
-            for (PColumn column : allColumns) {
-                if (column.getViewConstant() != null) {
-                    addViewColumnsToBe.add(column);
-                }
-            }
-        }
-        ImmutableBytesWritable ptr = new ImmutableBytesWritable();
-        // Allow full row upsert if no columns or only dynamic ones are specified and values count match
-        if (columnNodes.isEmpty() || columnNodes.size() == upsert.getTable().getDynamicColumns().size()) {
-            nColumnsToSet = allColumns.size() - posOffset;
-            columnIndexesToBe = new int[nColumnsToSet];
-            pkSlotIndexesToBe = new int[columnIndexesToBe.length];
-            targetColumns = Lists.newArrayListWithExpectedSize(columnIndexesToBe.length);
-            targetColumns.addAll(Collections.<PColumn>nCopies(columnIndexesToBe.length, null));
-            int minPKPos = 0;
-            if (isTenantSpecific) {
-                PColumn tenantColumn = table.getPKColumns().get(minPKPos);
-                columnIndexesToBe[minPKPos] = tenantColumn.getPosition();
-                targetColumns.set(minPKPos, tenantColumn);
-                minPKPos++;
-            }
-            if (isSharedViewIndex) {
-                PColumn indexIdColumn = table.getPKColumns().get(minPKPos);
-                columnIndexesToBe[minPKPos] = indexIdColumn.getPosition();
-                targetColumns.set(minPKPos, indexIdColumn);
-                minPKPos++;
-            }
-            for (int i = posOffset, j = 0; i < allColumns.size(); i++) {
-                PColumn column = allColumns.get(i);
-                if (SchemaUtil.isPKColumn(column)) {
-                    pkSlotIndexesToBe[i-posOffset] = j + posOffset;
-                    if (j++ < minPKPos) { // Skip, as it's already been set above
-                        continue;
+        NamedTableNode tableNode = upsert.getTable();
+        String tableName = tableNode.getName().getTableName();
+        String schemaName = tableNode.getName().getSchemaName();
+        // Retry once if columns are explicitly named, as the meta data may
+        // be out of date. We do not retry if auto commit is on, as we
+        // update the cache up front when we create the resolver in that case.
+        boolean retryOnce = !columnNodes.isEmpty() && !connection.getAutoCommit();
+        while (true) {
+            try {
+                /*
+                 * Update the cache up front if we don't specify columns. This is going
+                 * to cause a slow down, as every statement execution will check if the
+                 * meta data is out-of-date. There's no way around this, though, as we
+                 * compile in information based on the columns we find when none are
+                 * specified. For example, a column could have been removed and a new
+                 * one added and we may be assuming the wrong type if we don't do this
+                 * now. There's a hole in the existing algorithm too, even if columns
+                 * are specified. If one was removed and then added back with the same
+                 * name, but a different type, we'll run into problems.
+                 */
+                resolver = FromCompiler.getResolverForMutation(upsert, connection, columnNodes.isEmpty());
+                tableRefToBe = resolver.getTables().get(0);
+                table = tableRefToBe.getTable();
+                if (table.getType() == PTableType.VIEW) {
+                    if (table.getViewType().isReadOnly()) {
+                        throw new ReadOnlyTableException(schemaName,tableName);
                     }
-                    minPKPos = 0;
                 }
-                columnIndexesToBe[i-posOffset+minPKPos] = i;
-                targetColumns.set(i-posOffset+minPKPos, column);
-            }
-            if (!addViewColumnsToBe.isEmpty()) {
-                // All view columns overlap in this case
-                overlapViewColumnsToBe = addViewColumnsToBe;
-                addViewColumnsToBe = Collections.emptySet();
-            }
-        } else {
-            // Size for worse case
-            int numColsInUpsert = columnNodes.size();
-            nColumnsToSet = numColsInUpsert + addViewColumnsToBe.size() + (isTenantSpecific ? 1 : 0) +  + (isSharedViewIndex ? 1 : 0);
-            columnIndexesToBe = new int[nColumnsToSet];
-            pkSlotIndexesToBe = new int[columnIndexesToBe.length];
-            targetColumns = Lists.newArrayListWithExpectedSize(columnIndexesToBe.length);
-            targetColumns.addAll(Collections.<PColumn>nCopies(columnIndexesToBe.length, null));
-            Arrays.fill(columnIndexesToBe, -1); // TODO: necessary? So we'll get an AIOB exception if it's not replaced
-            Arrays.fill(pkSlotIndexesToBe, -1); // TODO: necessary? So we'll get an AIOB exception if it's not replaced
-            BitSet pkColumnsSet = new BitSet(table.getPKColumns().size());
-            int i = 0;
-            // Add tenant column directly, as we don't want to resolve it as this will fail
-            if (isTenantSpecific) {
-                PColumn tenantColumn = table.getPKColumns().get(i + posOffset);
-                columnIndexesToBe[i] = tenantColumn.getPosition();
-                pkColumnsSet.set(pkSlotIndexesToBe[i] = i + posOffset);
-                targetColumns.set(i, tenantColumn);
-                i++;
-            }
-            if (isSharedViewIndex) {
-                PColumn indexIdColumn = table.getPKColumns().get(i + posOffset);
-                columnIndexesToBe[i] = indexIdColumn.getPosition();
-                pkColumnsSet.set(pkSlotIndexesToBe[i] = i + posOffset);
-                targetColumns.set(i, indexIdColumn);
-                i++;
-            }
-            for (ColumnName colName : columnNodes) {
-                ColumnRef ref = resolver.resolveColumn(null, colName.getFamilyName(), colName.getColumnName());
-                PColumn column = ref.getColumn();
-                if (IndexUtil.getViewConstantValue(column, ptr)) {
-                    if (overlapViewColumnsToBe.isEmpty()) {
-                        overlapViewColumnsToBe = Sets.newHashSetWithExpectedSize(addViewColumnsToBe.size());
+                boolean isSalted = table.getBucketNum() != null;
+                isTenantSpecific = table.isMultiTenant() && connection.getTenantId() != null;
+                isSharedViewIndex = table.getViewIndexId() != null;
+                tenantId = isTenantSpecific ? connection.getTenantId().getString() : null;
+                int posOffset = isSalted ? 1 : 0;
+                // Setup array of column indexes parallel to values that are going to be set
+                allColumnsToBe = table.getColumns();
+        
+                nColumnsToSet = 0;
+                if (table.getViewType() == ViewType.UPDATABLE) {
+                    addViewColumnsToBe = Sets.newLinkedHashSetWithExpectedSize(allColumnsToBe.size());
+                    for (PColumn column : allColumnsToBe) {
+                        if (column.getViewConstant() != null) {
+                            addViewColumnsToBe.add(column);
+                        }
                     }
-                    nColumnsToSet--;
-                    overlapViewColumnsToBe.add(column);
-                    addViewColumnsToBe.remove(column);
                 }
-                columnIndexesToBe[i] = ref.getColumnPosition();
-                targetColumns.set(i, column);
-                if (SchemaUtil.isPKColumn(column)) {
-                    pkColumnsSet.set(pkSlotIndexesToBe[i] = ref.getPKSlotPosition());
-                }
-                i++;
-            }
-            for (PColumn column : addViewColumnsToBe) {
-                columnIndexesToBe[i] = column.getPosition();
-                targetColumns.set(i, column);
-                if (SchemaUtil.isPKColumn(column)) {
-                    pkColumnsSet.set(pkSlotIndexesToBe[i] = SchemaUtil.getPKPosition(table, column));
+                ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+                // Allow full row upsert if no columns or only dynamic ones are specified and values count match
+                if (columnNodes.isEmpty() || columnNodes.size() == upsert.getTable().getDynamicColumns().size()) {
+                    nColumnsToSet = allColumnsToBe.size() - posOffset;
+                    columnIndexesToBe = new int[nColumnsToSet];
+                    pkSlotIndexesToBe = new int[columnIndexesToBe.length];
+                    targetColumns = Lists.newArrayListWithExpectedSize(columnIndexesToBe.length);
+                    targetColumns.addAll(Collections.<PColumn>nCopies(columnIndexesToBe.length, null));
+                    int minPKPos = 0;
+                    if (isTenantSpecific) {
+                        PColumn tenantColumn = table.getPKColumns().get(minPKPos);
+                        columnIndexesToBe[minPKPos] = tenantColumn.getPosition();
+                        targetColumns.set(minPKPos, tenantColumn);
+                        minPKPos++;
+                    }
+                    if (isSharedViewIndex) {
+                        PColumn indexIdColumn = table.getPKColumns().get(minPKPos);
+                        columnIndexesToBe[minPKPos] = indexIdColumn.getPosition();
+                        targetColumns.set(minPKPos, indexIdColumn);
+                        minPKPos++;
+                    }
+                    for (int i = posOffset, j = 0; i < allColumnsToBe.size(); i++) {
+                        PColumn column = allColumnsToBe.get(i);
+                        if (SchemaUtil.isPKColumn(column)) {
+                            pkSlotIndexesToBe[i-posOffset] = j + posOffset;
+                            if (j++ < minPKPos) { // Skip, as it's already been set above
+                                continue;
+                            }
+                            minPKPos = 0;
+                        }
+                        columnIndexesToBe[i-posOffset+minPKPos] = i;
+                        targetColumns.set(i-posOffset+minPKPos, column);
+                    }
+                    if (!addViewColumnsToBe.isEmpty()) {
+                        // All view columns overlap in this case
+                        overlapViewColumnsToBe = addViewColumnsToBe;
+                        addViewColumnsToBe = Collections.emptySet();
+                    }
+                } else {
+                    // Size for worse case
+                    int numColsInUpsert = columnNodes.size();
+                    nColumnsToSet = numColsInUpsert + addViewColumnsToBe.size() + (isTenantSpecific ? 1 : 0) +  + (isSharedViewIndex ? 1 : 0);
+                    columnIndexesToBe = new int[nColumnsToSet];
+                    pkSlotIndexesToBe = new int[columnIndexesToBe.length];
+                    targetColumns = Lists.newArrayListWithExpectedSize(columnIndexesToBe.length);
+                    targetColumns.addAll(Collections.<PColumn>nCopies(columnIndexesToBe.length, null));
+                    Arrays.fill(columnIndexesToBe, -1); // TODO: necessary? So we'll get an AIOB exception if it's not replaced
+                    Arrays.fill(pkSlotIndexesToBe, -1); // TODO: necessary? So we'll get an AIOB exception if it's not replaced
+                    BitSet pkColumnsSet = new BitSet(table.getPKColumns().size());
+                    int i = 0;
+                    // Add tenant column directly, as we don't want to resolve it as this will fail
+                    if (isTenantSpecific) {
+                        PColumn tenantColumn = table.getPKColumns().get(i + posOffset);
+                        columnIndexesToBe[i] = tenantColumn.getPosition();
+                        pkColumnsSet.set(pkSlotIndexesToBe[i] = i + posOffset);
+                        targetColumns.set(i, tenantColumn);
+                        i++;
+                    }
+                    if (isSharedViewIndex) {
+                        PColumn indexIdColumn = table.getPKColumns().get(i + posOffset);
+                        columnIndexesToBe[i] = indexIdColumn.getPosition();
+                        pkColumnsSet.set(pkSlotIndexesToBe[i] = i + posOffset);
+                        targetColumns.set(i, indexIdColumn);
+                        i++;
+                    }
+                    for (ColumnName colName : columnNodes) {
+                        ColumnRef ref = resolver.resolveColumn(null, colName.getFamilyName(), colName.getColumnName());
+                        PColumn column = ref.getColumn();
+                        if (IndexUtil.getViewConstantValue(column, ptr)) {
+                            if (overlapViewColumnsToBe.isEmpty()) {
+                                overlapViewColumnsToBe = Sets.newHashSetWithExpectedSize(addViewColumnsToBe.size());
+                            }
+                            nColumnsToSet--;
+                            overlapViewColumnsToBe.add(column);
+                            addViewColumnsToBe.remove(column);
+                        }
+                        columnIndexesToBe[i] = ref.getColumnPosition();
+                        targetColumns.set(i, column);
+                        if (SchemaUtil.isPKColumn(column)) {
+                            pkColumnsSet.set(pkSlotIndexesToBe[i] = ref.getPKSlotPosition());
+                        }
+                        i++;
+                    }
+                    for (PColumn column : addViewColumnsToBe) {
+                        columnIndexesToBe[i] = column.getPosition();
+                        targetColumns.set(i, column);
+                        if (SchemaUtil.isPKColumn(column)) {
+                            pkColumnsSet.set(pkSlotIndexesToBe[i] = SchemaUtil.getPKPosition(table, column));
+                        }
+                        i++;
+                    }
+                    for (i = posOffset; i < table.getPKColumns().size(); i++) {
+                        PColumn pkCol = table.getPKColumns().get(i);
+                        if (!pkColumnsSet.get(i)) {
+                            if (!pkCol.isNullable()) {
+                                throw new ConstraintViolationException(table.getName().getString() + "." + pkCol.getName().getString() + " may not be null");
+                            }
+                        }
+                    }
                 }
-                i++;
-            }
-            for (i = posOffset; i < table.getPKColumns().size(); i++) {
-                PColumn pkCol = table.getPKColumns().get(i);
-                if (!pkColumnsSet.get(i)) {
-                    if (!pkCol.isNullable()) {
-                        throw new ConstraintViolationException(table.getName().getString() + "." + pkCol.getName().getString() + " may not be null");
+                break;
+            } catch (MetaDataEntityNotFoundException e) {
+                // Catch column/column family not found exception, as our meta data may
+                // be out of sync. Update the cache once and retry if we were out of sync.
+                // Otherwise throw, as we'll just get the same error next time.
+                if (retryOnce) {
+                    retryOnce = false;
+                    MetaDataMutationResult result = new MetaDataClient(connection).updateCache(schemaName, tableName);
+                    if (result.wasUpdated()) {
+                        continue;
                     }
                 }
+                throw e;
             }
-        }
+       }
         
+        final List<PColumn> allColumns = allColumnsToBe;
         List<ParseNode> valueNodes = upsert.getValues();
         QueryPlan plan = null;
         RowProjector rowProjectorToBe = null;
@@ -355,7 +403,7 @@ public class UpsertCompiler {
             select = StatementNormalizer.normalize(select, selectResolver);
             select = prependTenantAndViewConstants(table, select, tenantId, addViewColumnsToBe);
             sameTable = select.getFrom().size() == 1
-                && tableRef.equals(selectResolver.getTables().get(0));
+                && tableRefToBe.equals(selectResolver.getTables().get(0));
             /* We can run the upsert in a coprocessor if:
              * 1) from has only 1 table and the into table matches from table
              * 2) the select query isn't doing aggregation (which requires a client-side final merge)
@@ -374,7 +422,7 @@ public class UpsertCompiler {
             } else {
                 // We can pipeline the upsert select instead of spooling everything to disk first,
                 // if we don't have any post processing that's required.
-                parallelIteratorFactory = upsertParallelIteratorFactoryToBe = new UpsertingParallelIteratorFactory(connection, tableRef);
+                parallelIteratorFactory = upsertParallelIteratorFactoryToBe = new UpsertingParallelIteratorFactory(connection, tableRefToBe);
                 // If we're in the else, then it's not an aggregate, distinct, limted, or sequence using query,
                 // so we might be able to run it entirely on the server side.
                 runOnServer = sameTable && isAutoCommit && !(table.isImmutableRows() && !table.getIndexes().isEmpty());
@@ -392,7 +440,7 @@ public class UpsertCompiler {
             // Pass scan through if same table in upsert and select so that projection is computed correctly
             // Use optimizer to choose the best plan 
             plan = new QueryOptimizer(services).optimize(statement, select, selectResolver, targetColumns, parallelIteratorFactory);
-            runOnServer &= plan.getTableRef().equals(tableRef);
+            runOnServer &= plan.getTableRef().equals(tableRefToBe);
             rowProjectorToBe = plan.getProjector();
             nValuesToSet = rowProjectorToBe.getColumnCount();
             // Cannot auto commit if doing aggregation or topN or salted
@@ -416,6 +464,7 @@ public class UpsertCompiler {
                 .build().buildException();
         }
         
+        final TableRef tableRef = tableRefToBe;
         final int[] columnIndexes = columnIndexesToBe;
         final int[] pkSlotIndexes = pkSlotIndexesToBe;
         final Set<PColumn> addViewColumns = addViewColumnsToBe;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42465628/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 911e3ea..532255f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -262,8 +262,9 @@ public class MutationState implements SQLCloseable {
     }
         
     /**
-     * Validates that the meta data is still valid based on the current server time
-     * and returns the server time to use for the upsert for each table.
+     * Validates that the meta data is valid against the server meta data if we haven't yet done so.
+     * Otherwise, for every UPSERT VALUES call, we'd need to hit the server to see if the meta data
+     * has changed.
      * @param connection
      * @return the server time to use for the upsert
      * @throws SQLException if the table or any columns no longer exist
@@ -278,6 +279,8 @@ public class MutationState implements SQLCloseable {
             TableRef tableRef = entry.getKey();
             long serverTimeStamp = tableRef.getTimeStamp();
             PTable table = tableRef.getTable();
+            // If we're auto committing, we've already validated the schema when we got the ColumnResolver,
+            // so no need to do it again here.
             if (!connection.getAutoCommit()) {
                 MetaDataMutationResult result = client.updateCache(table.getSchemaName().getString(), table.getTableName().getString());
                 long timestamp = result.getMutationTime();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42465628/phoenix-core/src/main/java/org/apache/phoenix/parse/DMLStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/DMLStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/DMLStatement.java
new file mode 100644
index 0000000..46bd907
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/DMLStatement.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+public class DMLStatement extends SingleTableStatement {
+
+    public DMLStatement(NamedTableNode table, int bindCount) {
+        super(table, bindCount);
+    }
+    
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42465628/phoenix-core/src/main/java/org/apache/phoenix/parse/DeleteStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/DeleteStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/DeleteStatement.java
index b15b325..d295e85 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/DeleteStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/DeleteStatement.java
@@ -22,7 +22,7 @@ import java.util.List;
 
 import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
 
-public class DeleteStatement extends SingleTableStatement implements FilterableStatement {
+public class DeleteStatement extends DMLStatement implements FilterableStatement {
     private final ParseNode whereNode;
     private final List<OrderByNode> orderBy;
     private final LimitNode limit;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42465628/phoenix-core/src/main/java/org/apache/phoenix/parse/UpsertStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/UpsertStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/UpsertStatement.java
index bb23421..fc299d1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/UpsertStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/UpsertStatement.java
@@ -20,7 +20,7 @@ package org.apache.phoenix.parse;
 import java.util.Collections;
 import java.util.List;
 
-public class UpsertStatement extends SingleTableStatement { 
+public class UpsertStatement extends DMLStatement { 
     private final List<ColumnName> columns;
     private final List<ParseNode> values;
     private final SelectStatement select;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42465628/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 64cf70c..a462a5d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -574,7 +574,7 @@ public class MetaDataClient {
         boolean allocateViewIndexId = false;
         while (true) {
             try {
-                ColumnResolver resolver = FromCompiler.getResolverForMutation(statement, connection);
+                ColumnResolver resolver = FromCompiler.getResolver(statement, connection);
                 tableRef = resolver.getTables().get(0);
                 PTable dataTable = tableRef.getTable();
                 boolean isTenantConnection = connection.getTenantId() != null;
@@ -1508,7 +1508,7 @@ public class MetaDataClient {
             boolean retried = false;
             while (true) {
                 List<Mutation> tableMetaData = Lists.newArrayListWithExpectedSize(2);
-                ColumnResolver resolver = FromCompiler.getResolverForMutation(statement, connection);
+                ColumnResolver resolver = FromCompiler.getResolver(statement, connection);
                 PTable table = resolver.getTables().get(0).getTable();
                 if (logger.isDebugEnabled()) {
                     logger.debug("Resolved table to " + table.getName().getString() + " with seqNum " + table.getSequenceNumber() + " at timestamp " + table.getTimeStamp() + " with " + table.getColumns().size() + " columns: " + table.getColumns());
@@ -1822,7 +1822,7 @@ public class MetaDataClient {
             String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
             boolean retried = false;
             while (true) {
-                final ColumnResolver resolver = FromCompiler.getResolverForMutation(statement, connection);
+                final ColumnResolver resolver = FromCompiler.getResolver(statement, connection);
                 PTable table = resolver.getTables().get(0).getTable();
                 List<ColumnName> columnRefs = statement.getColumnRefs();
                 if(columnRefs == null) {
@@ -1996,7 +1996,7 @@ public class MetaDataClient {
             }
             connection.setAutoCommit(false);
             // Confirm index table is valid and up-to-date
-            TableRef indexRef = FromCompiler.getResolverForMutation(statement, connection).getTables().get(0);
+            TableRef indexRef = FromCompiler.getResolver(statement, connection).getTables().get(0);
             PreparedStatement tableUpsert = null;
             try {
                 if(newIndexState == PIndexState.ACTIVE){


[3/3] git commit: PHOENIX-1181 client cache fails to update itself after a table was altered from a diff client

Posted by ja...@apache.org.
PHOENIX-1181 client cache fails to update itself after a table was altered from a diff client


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/5ca432b2
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/5ca432b2
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/5ca432b2

Branch: refs/heads/3.0
Commit: 5ca432b2dd4b51c8314a5659a10e33eb4b7693bd
Parents: 23fc482
Author: James Taylor <jt...@salesforce.com>
Authored: Tue Aug 19 23:34:31 2014 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Tue Aug 19 23:34:31 2014 -0700

----------------------------------------------------------------------
 .../apache/phoenix/compile/FromCompiler.java    |   9 +-
 .../apache/phoenix/compile/UpsertCompiler.java  | 201 ++++++++++---------
 2 files changed, 103 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/5ca432b2/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
index 02d4f99..efc0973 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
@@ -177,19 +177,14 @@ public class FromCompiler {
 
     public static ColumnResolver getResolverForMutation(DMLStatement statement, PhoenixConnection connection)
             throws SQLException {
-        return getResolverForMutation(statement, connection, false);
-    }
-    
-    public static ColumnResolver getResolverForMutation(DMLStatement statement, PhoenixConnection connection, boolean updateCacheImmediately)
-            throws SQLException {
         /*
          * We validate the meta data at commit time for mutations, as this allows us to do many UPSERT VALUES calls
          * without hitting the server each time to check if the meta data is up-to-date.
          */
-        SingleTableColumnResolver visitor = new SingleTableColumnResolver(connection, statement.getTable(), updateCacheImmediately);
+        SingleTableColumnResolver visitor = new SingleTableColumnResolver(connection, statement.getTable(), false);
         return visitor;
     }
-
+    
     private static class SingleTableColumnResolver extends BaseColumnResolver {
         	private final List<TableRef> tableRefs;
         	private final String alias;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5ca432b2/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index d625a9d..b8e1a6d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -37,7 +37,6 @@ import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
-import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
 import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
@@ -48,7 +47,6 @@ import org.apache.phoenix.expression.LiteralExpression;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.index.IndexMetaDataCacheClient;
 import org.apache.phoenix.index.PhoenixIndexCodec;
-import org.apache.phoenix.iterate.ParallelIterators.ParallelIteratorFactory;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixResultSet;
@@ -224,28 +222,23 @@ public class UpsertCompiler {
         int[] columnIndexesToBe;
         int nColumnsToSet = 0;
         int[] pkSlotIndexesToBe;
+        List<ParseNode> valueNodes = upsert.getValues();
         List<PColumn> targetColumns;
         NamedTableNode tableNode = upsert.getTable();
         String tableName = tableNode.getName().getTableName();
         String schemaName = tableNode.getName().getSchemaName();
-        // Retry once if columns are explicitly named, as the meta data may
+        QueryPlan queryPlanToBe = null;
+        int nValuesToSet;
+        boolean sameTable = false;
+        boolean runOnServer = false;
+        UpsertingParallelIteratorFactory parallelIteratorFactoryToBe = null;
+        // Retry once if auto commit is off, as the meta data may
         // be out of date. We do not retry if auto commit is on, as we
         // update the cache up front when we create the resolver in that case.
-        boolean retryOnce = !columnNodes.isEmpty() && !connection.getAutoCommit();
+        boolean retryOnce = !connection.getAutoCommit();
         while (true) {
             try {
-                /*
-                 * Update the cache up front if we don't specify columns. This is going
-                 * to cause a slow down, as every statement execution will check if the
-                 * meta data is out-of-date. There's no way around this, though, as we
-                 * compile in information based on the columns we find when none are
-                 * specified. For example, a column could have been removed and a new
-                 * one added and we may be assuming the wrong type if we don't do this
-                 * now. There's a hole in the existing algorithm too, even if columns
-                 * are specified. If one was removed and then added back with the same
-                 * name, but a different type, we'll run into problems.
-                 */
-                resolver = FromCompiler.getResolverForMutation(upsert, connection, columnNodes.isEmpty());
+                resolver = FromCompiler.getResolverForMutation(upsert, connection);
                 tableRefToBe = resolver.getTables().get(0);
                 table = tableRefToBe.getTable();
                 if (table.getType() == PTableType.VIEW) {
@@ -370,105 +363,113 @@ public class UpsertCompiler {
                         }
                     }
                 }
-                break;
+                boolean isAutoCommit = connection.getAutoCommit();
+                if (valueNodes == null) {
+                    SelectStatement select = upsert.getSelect();
+                    assert(select != null);
+                    select = SubselectRewriter.flatten(select, connection);
+                    ColumnResolver selectResolver = FromCompiler.getResolverForQuery(select, connection);
+                    select = StatementNormalizer.normalize(select, selectResolver);
+                    select = prependTenantAndViewConstants(table, select, tenantId, addViewColumnsToBe);
+                    sameTable = select.getFrom().size() == 1
+                        && tableRefToBe.equals(selectResolver.getTables().get(0));
+                    /* We can run the upsert in a coprocessor if:
+                     * 1) from has only 1 table and the into table matches from table
+                     * 2) the select query isn't doing aggregation (which requires a client-side final merge)
+                     * 3) autoCommit is on
+                     * 4) the table is not immutable with indexes, as the client is the one that figures out the additional
+                     *    puts for index tables.
+                     * 5) no limit clause, as the limit clause requires client-side post processing
+                     * 6) no sequences, as sequences imply that the order of upsert must match the order of
+                     *    selection.
+                     * Otherwise, run the query to pull the data from the server
+                     * and populate the MutationState (upto a limit).
+                    */            
+                    if (! (select.isAggregate() || select.isDistinct() || select.getLimit() != null || select.hasSequence()) ) {
+                        // We can pipeline the upsert select instead of spooling everything to disk first,
+                        // if we don't have any post processing that's required.
+                        parallelIteratorFactoryToBe = new UpsertingParallelIteratorFactory(connection, tableRefToBe);
+                        // If we're in the else, then it's not an aggregate, distinct, limted, or sequence using query,
+                        // so we might be able to run it entirely on the server side.
+                        runOnServer = sameTable && isAutoCommit && !(table.isImmutableRows() && !table.getIndexes().isEmpty());
+                    }
+                    // If we may be able to run on the server, add a hint that favors using the data table
+                    // if all else is equal.
+                    // TODO: it'd be nice if we could figure out in advance if the PK is potentially changing,
+                    // as this would disallow running on the server. We currently use the row projector we
+                    // get back to figure this out.
+                    HintNode hint = upsert.getHint();
+                    if (!upsert.getHint().hasHint(Hint.USE_INDEX_OVER_DATA_TABLE)) {
+                        hint = HintNode.create(hint, Hint.USE_DATA_OVER_INDEX_TABLE);
+                    }
+                    select = SelectStatement.create(select, hint);
+                    // Pass scan through if same table in upsert and select so that projection is computed correctly
+                    // Use optimizer to choose the best plan
+                    try {
+                        QueryCompiler compiler = new QueryCompiler(statement, select, selectResolver, targetColumns, parallelIteratorFactoryToBe, new SequenceManager(statement));
+                        queryPlanToBe = compiler.compile();
+                    } catch (MetaDataEntityNotFoundException e) {
+                        retryOnce = false; // don't retry if select clause has meta data entities that aren't found, as we already updated the cache
+                        throw e;
+                    }
+                    nValuesToSet = queryPlanToBe.getProjector().getColumnCount();
+                    // Cannot auto commit if doing aggregation or topN or salted
+                    // Salted causes problems because the row may end up living on a different region
+                } else {
+                    nValuesToSet = valueNodes.size() + addViewColumnsToBe.size() + (isTenantSpecific ? 1 : 0) + (isSharedViewIndex ? 1 : 0);
+                }
+                // Resize down to allow a subset of columns to be specifiable
+                if (columnNodes.isEmpty() && columnIndexesToBe.length >= nValuesToSet) {
+                    nColumnsToSet = nValuesToSet;
+                    columnIndexesToBe = Arrays.copyOf(columnIndexesToBe, nValuesToSet);
+                    pkSlotIndexesToBe = Arrays.copyOf(pkSlotIndexesToBe, nValuesToSet);
+                }
+                
+                if (nValuesToSet != nColumnsToSet) {
+                    // We might have added columns, so refresh cache and try again if stale.
+                    // Note that this check is not really sufficient, as a column could have
+                    // been removed and the added back and we wouldn't detect that here.
+                    if (retryOnce) {
+                        retryOnce = false;
+                        if (new MetaDataClient(connection).updateCache(schemaName, tableName).wasUpdated()) {
+                            continue;
+                        }
+                    }
+                    throw new SQLExceptionInfo.Builder(SQLExceptionCode.UPSERT_COLUMN_NUMBERS_MISMATCH)
+                        .setMessage("Numbers of columns: " + nColumnsToSet + ". Number of values: " + nValuesToSet)
+                        .build().buildException();
+                }
             } catch (MetaDataEntityNotFoundException e) {
                 // Catch column/column family not found exception, as our meta data may
                 // be out of sync. Update the cache once and retry if we were out of sync.
                 // Otherwise throw, as we'll just get the same error next time.
                 if (retryOnce) {
                     retryOnce = false;
-                    MetaDataMutationResult result = new MetaDataClient(connection).updateCache(schemaName, tableName);
-                    if (result.wasUpdated()) {
+                    if (new MetaDataClient(connection).updateCache(schemaName, tableName).wasUpdated()) {
                         continue;
                     }
                 }
                 throw e;
             }
-       }
-        
-        final List<PColumn> allColumns = allColumnsToBe;
-        List<ParseNode> valueNodes = upsert.getValues();
-        QueryPlan plan = null;
-        RowProjector rowProjectorToBe = null;
-        final int nValuesToSet;
-        boolean sameTable = false;
-        boolean runOnServer = false;
-        UpsertingParallelIteratorFactory upsertParallelIteratorFactoryToBe = null;
-        final boolean isAutoCommit = connection.getAutoCommit();
-        if (valueNodes == null) {
-            SelectStatement select = upsert.getSelect();
-            assert(select != null);
-            select = SubselectRewriter.flatten(select, connection);
-            ColumnResolver selectResolver = FromCompiler.getResolverForQuery(select, connection);
-            select = StatementNormalizer.normalize(select, selectResolver);
-            select = prependTenantAndViewConstants(table, select, tenantId, addViewColumnsToBe);
-            sameTable = select.getFrom().size() == 1
-                && tableRefToBe.equals(selectResolver.getTables().get(0));
-            /* We can run the upsert in a coprocessor if:
-             * 1) from has only 1 table and the into table matches from table
-             * 2) the select query isn't doing aggregation (which requires a client-side final merge)
-             * 3) autoCommit is on
-             * 4) the table is not immutable with indexes, as the client is the one that figures out the additional
-             *    puts for index tables.
-             * 5) no limit clause, as the limit clause requires client-side post processing
-             * 6) no sequences, as sequences imply that the order of upsert must match the order of
-             *    selection.
-             * Otherwise, run the query to pull the data from the server
-             * and populate the MutationState (upto a limit).
-            */            
-            ParallelIteratorFactory parallelIteratorFactory;
-            if (select.isAggregate() || select.isDistinct() || select.getLimit() != null || select.hasSequence()) {
-                parallelIteratorFactory = null;
-            } else {
-                // We can pipeline the upsert select instead of spooling everything to disk first,
-                // if we don't have any post processing that's required.
-                parallelIteratorFactory = upsertParallelIteratorFactoryToBe = new UpsertingParallelIteratorFactory(connection, tableRefToBe);
-                // If we're in the else, then it's not an aggregate, distinct, limted, or sequence using query,
-                // so we might be able to run it entirely on the server side.
-                runOnServer = sameTable && isAutoCommit && !(table.isImmutableRows() && !table.getIndexes().isEmpty());
-            }
-            // If we may be able to run on the server, add a hint that favors using the data table
-            // if all else is equal.
-            // TODO: it'd be nice if we could figure out in advance if the PK is potentially changing,
-            // as this would disallow running on the server. We currently use the row projector we
-            // get back to figure this out.
-            HintNode hint = upsert.getHint();
-            if (!upsert.getHint().hasHint(Hint.USE_INDEX_OVER_DATA_TABLE)) {
-                hint = HintNode.create(hint, Hint.USE_DATA_OVER_INDEX_TABLE);
-            }
-            select = SelectStatement.create(select, hint);
-            // Pass scan through if same table in upsert and select so that projection is computed correctly
-            // Use optimizer to choose the best plan 
-            plan = new QueryOptimizer(services).optimize(statement, select, selectResolver, targetColumns, parallelIteratorFactory);
-            runOnServer &= plan.getTableRef().equals(tableRefToBe);
-            rowProjectorToBe = plan.getProjector();
-            nValuesToSet = rowProjectorToBe.getColumnCount();
-            // Cannot auto commit if doing aggregation or topN or salted
-            // Salted causes problems because the row may end up living on a different region
-        } else {
-            nValuesToSet = valueNodes.size() + addViewColumnsToBe.size() + (isTenantSpecific ? 1 : 0) + (isSharedViewIndex ? 1 : 0);
-        }
-        final RowProjector projector = rowProjectorToBe;
-        final UpsertingParallelIteratorFactory upsertParallelIteratorFactory = upsertParallelIteratorFactoryToBe;
-        final QueryPlan queryPlan = plan;
-        // Resize down to allow a subset of columns to be specifiable
-        if (columnNodes.isEmpty() && columnIndexesToBe.length >= nValuesToSet) {
-            nColumnsToSet = nValuesToSet;
-            columnIndexesToBe = Arrays.copyOf(columnIndexesToBe, nValuesToSet);
-            pkSlotIndexesToBe = Arrays.copyOf(pkSlotIndexesToBe, nValuesToSet);
+            break;
         }
         
-        if (nValuesToSet != nColumnsToSet) {
-            throw new SQLExceptionInfo.Builder(SQLExceptionCode.UPSERT_COLUMN_NUMBERS_MISMATCH)
-                .setMessage("Numbers of columns: " + nColumnsToSet + ". Number of values: " + nValuesToSet)
-                .build().buildException();
+        RowProjector projectorToBe = null;
+        // Optimize only after all checks have been performed
+        if (valueNodes == null) {
+            queryPlanToBe = new QueryOptimizer(services).optimize(queryPlanToBe, statement, targetColumns, parallelIteratorFactoryToBe);
+            projectorToBe = queryPlanToBe.getProjector();
+            runOnServer &= queryPlanToBe.getTableRef().equals(tableRefToBe);
         }
-        
+        final List<PColumn> allColumns = allColumnsToBe;
+        final RowProjector projector = projectorToBe;
+        final QueryPlan queryPlan = queryPlanToBe;
         final TableRef tableRef = tableRefToBe;
         final int[] columnIndexes = columnIndexesToBe;
         final int[] pkSlotIndexes = pkSlotIndexesToBe;
         final Set<PColumn> addViewColumns = addViewColumnsToBe;
         final Set<PColumn> overlapViewColumns = overlapViewColumnsToBe;
+        final UpsertingParallelIteratorFactory parallelIteratorFactory = parallelIteratorFactoryToBe;
         
         // TODO: break this up into multiple functions
         ////////////////////////////////////////////////////////////////////
@@ -648,12 +649,12 @@ public class UpsertCompiler {
                 @Override
                 public MutationState execute() throws SQLException {
                     ResultIterator iterator = queryPlan.iterator();
-                    if (upsertParallelIteratorFactory == null) {
+                    if (parallelIteratorFactory == null) {
                         return upsertSelect(statement, tableRef, projector, iterator, columnIndexes, pkSlotIndexes);
                     }
-                    upsertParallelIteratorFactory.setRowProjector(projector);
-                    upsertParallelIteratorFactory.setColumnIndexes(columnIndexes);
-                    upsertParallelIteratorFactory.setPkSlotIndexes(pkSlotIndexes);
+                    parallelIteratorFactory.setRowProjector(projector);
+                    parallelIteratorFactory.setColumnIndexes(columnIndexes);
+                    parallelIteratorFactory.setPkSlotIndexes(pkSlotIndexes);
                     Tuple tuple;
                     long totalRowCount = 0;
                     while ((tuple=iterator.next()) != null) {// Runs query


[2/3] git commit: Merge branch '3.0' of https://git-wip-us.apache.org/repos/asf/phoenix into 3.0

Posted by ja...@apache.org.
Merge branch '3.0' of https://git-wip-us.apache.org/repos/asf/phoenix into 3.0


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/23fc4825
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/23fc4825
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/23fc4825

Branch: refs/heads/3.0
Commit: 23fc4825083479d190fb6d0a9e3daee9a05d5238
Parents: 4246562 91e5d3a
Author: James Taylor <jt...@salesforce.com>
Authored: Tue Aug 19 20:51:46 2014 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Tue Aug 19 20:51:46 2014 -0700

----------------------------------------------------------------------
 CHANGES                                         |   9 +-
 dev/make_rc.sh                                  |   2 +-
 .../apache/phoenix/end2end/DerivedTableIT.java  |  13 ++
 .../org/apache/phoenix/end2end/HashJoinIT.java  |  49 ++++
 .../MutatingParallelIteratorFactory.java        |   3 +-
 .../apache/phoenix/compile/QueryCompiler.java   |  13 +-
 .../phoenix/compile/StatementContext.java       |  23 --
 .../apache/phoenix/execute/AggregatePlan.java   |   9 +-
 .../phoenix/iterate/ChunkedResultIterator.java  |  27 +--
 .../phoenix/iterate/ParallelIterators.java      |   7 +-
 .../phoenix/iterate/SpoolingResultIterator.java |   3 +-
 .../RoundFloorCeilExpressionsTest.java          | 230 +++++++++++++++++++
 .../RoundFloorCeilExpressionsUnitTests.java     | 230 -------------------
 .../phoenix/query/KeyRangeCoalesceTest.java     | 161 +++++++++++++
 .../phoenix/query/KeyRangeCoalesceTests.java    | 161 -------------
 .../phoenix/query/KeyRangeIntersectTest.java    |  97 ++++++++
 .../phoenix/query/KeyRangeIntersectTests.java   |  97 --------
 .../apache/phoenix/query/KeyRangeUnionTest.java |  97 ++++++++
 .../phoenix/query/KeyRangeUnionTests.java       |  97 --------
 19 files changed, 689 insertions(+), 639 deletions(-)
----------------------------------------------------------------------