You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2018/11/17 23:45:45 UTC

[4/9] phoenix git commit: PHOENIX-5024 - Cleanup anonymous inner classes in PostDDLCompiler

PHOENIX-5024 - Cleanup anonymous inner classes in PostDDLCompiler


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b493797f
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b493797f
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b493797f

Branch: refs/heads/omid2
Commit: b493797f67a133a94f83546f29d5a42c0e4e6835
Parents: 89bbdfd
Author: Geoffrey Jacoby <gj...@apache.org>
Authored: Fri Nov 16 09:55:49 2018 -0800
Committer: Geoffrey Jacoby <gj...@apache.org>
Committed: Fri Nov 16 11:03:26 2018 -0800

----------------------------------------------------------------------
 .../apache/phoenix/compile/PostDDLCompiler.java | 478 ++++++++++---------
 1 file changed, 258 insertions(+), 220 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/b493797f/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
index 709534e..a74c5f1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
@@ -87,248 +87,286 @@ public class PostDDLCompiler {
             final long timestamp) throws SQLException {
         PhoenixStatement statement = new PhoenixStatement(connection);
         final StatementContext context = new StatementContext(
-                statement, 
-                new ColumnResolver() {
+                statement,
+            new MultipleTableRefColumnResolver(tableRefs),
+                scan,
+                new SequenceManager(statement));
+        return new PostDDLMutationPlan(context, tableRefs, timestamp, emptyCF, deleteList, projectCFs);
+    }
 
-                    @Override
-                    public List<TableRef> getTables() {
-                        return tableRefs;
-                    }
+    private static class MultipleTableRefColumnResolver implements ColumnResolver {
 
-                    @Override
-                    public TableRef resolveTable(String schemaName, String tableName) throws SQLException {
-                        throw new UnsupportedOperationException();
-                    }
+        private final List<TableRef> tableRefs;
 
-                    @Override
-                    public ColumnRef resolveColumn(String schemaName, String tableName, String colName)
-                            throws SQLException {
-                        throw new UnsupportedOperationException();
-                    }
+        public MultipleTableRefColumnResolver(List<TableRef> tableRefs) {
+            this.tableRefs = tableRefs;
+        }
 
-					@Override
-					public List<PFunction> getFunctions() {
-						return Collections.<PFunction>emptyList();
-					}
-
-					@Override
-					public PFunction resolveFunction(String functionName)
-							throws SQLException {
-						throw new FunctionNotFoundException(functionName);
-					}
-
-					@Override
-					public boolean hasUDFs() {
-						return false;
-					}
-
-					@Override
-					public PSchema resolveSchema(String schemaName) throws SQLException {
-						throw new SchemaNotFoundException(schemaName);
-					}
-
-					@Override
-					public List<PSchema> getSchemas() {
-						throw new UnsupportedOperationException();
-					}
-                    
-                },
-                scan,
-                new SequenceManager(statement));
-        return new BaseMutationPlan(context, Operation.UPSERT /* FIXME */) {
-            
-            @Override
-            public MutationState execute() throws SQLException {
-                if (tableRefs.isEmpty()) {
-                    return new MutationState(0, 1000, connection);
-                }
-                boolean wasAutoCommit = connection.getAutoCommit();
-                try {
-                    connection.setAutoCommit(true);
-                    SQLException sqlE = null;
-                    /*
-                     * Handles:
-                     * 1) deletion of all rows for a DROP TABLE and subsequently deletion of all rows for a DROP INDEX;
-                     * 2) deletion of all column values for a ALTER TABLE DROP COLUMN
-                     * 3) updating the necessary rows to have an empty KV
-                     * 4) updating table stats
-                     */
-                    long totalMutationCount = 0;
-                    for (final TableRef tableRef : tableRefs) {
-                        Scan scan = ScanUtil.newScan(context.getScan());
-                        SelectStatement select = SelectStatement.COUNT_ONE;
-                        // We need to use this tableRef
-                        ColumnResolver resolver = new ColumnResolver() {
-                            @Override
-                            public List<TableRef> getTables() {
-                                return Collections.singletonList(tableRef);
-                            }
-                            
-                            @Override
-                            public java.util.List<PFunction> getFunctions() {
-                                return Collections.emptyList();
-                            };
-                            
-                            @Override
-                            public TableRef resolveTable(String schemaName, String tableName)
-                                    throws SQLException {
-                                throw new UnsupportedOperationException();
-                            }
-                            @Override
-                            public ColumnRef resolveColumn(String schemaName, String tableName, String colName) throws SQLException {
-                                PColumn column = tableName != null
-                                        ? tableRef.getTable().getColumnFamily(tableName).getPColumnForColumnName(colName)
-                                        : tableRef.getTable().getColumnForColumnName(colName);
-                                return new ColumnRef(tableRef, column.getPosition());
-                            }
-                            
-                            @Override
-                            public PFunction resolveFunction(String functionName) throws SQLException {
-                                throw new UnsupportedOperationException();
-                            };
-
-                            @Override
-                            public boolean hasUDFs() {
-                                return false;
-                            }
+        @Override
+        public List<TableRef> getTables() {
+            return tableRefs;
+        }
 
-                            @Override
-                            public List<PSchema> getSchemas() {
-                                throw new UnsupportedOperationException();
-                            }
+        @Override
+        public TableRef resolveTable(String schemaName, String tableName) throws SQLException {
+            throw new UnsupportedOperationException();
+        }
 
-                            @Override
-                            public PSchema resolveSchema(String schemaName) throws SQLException {
-                                throw new SchemaNotFoundException(schemaName);
-                            }
-                        };
-                        PhoenixStatement statement = new PhoenixStatement(connection);
-                        StatementContext context = new StatementContext(statement, resolver, scan, new SequenceManager(statement));
-                        long ts = timestamp;
-                        // FIXME: DDL operations aren't transactional, so we're basing the timestamp on a server timestamp.
-                        // Not sure what the fix should be. We don't need conflict detection nor filtering of invalid transactions
-                        // in this case, so maybe this is ok.
-                        if (ts!=HConstants.LATEST_TIMESTAMP && tableRef.getTable().isTransactional()) {
-                            ts = TransactionUtil.convertToNanoseconds(ts);
-                        }
-                        ScanUtil.setTimeRange(scan, scan.getTimeRange().getMin(), ts);
-                        if (emptyCF != null) {
-                            scan.setAttribute(BaseScannerRegionObserver.EMPTY_CF, emptyCF);
-                            scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER, EncodedColumnsUtil.getEmptyKeyValueInfo(tableRef.getTable()).getFirst());
-                        }
-                        ServerCache cache = null;
-                        try {
-                            if (deleteList != null) {
-                                if (deleteList.isEmpty()) {
-                                    scan.setAttribute(BaseScannerRegionObserver.DELETE_AGG, QueryConstants.TRUE);
-                                    // In the case of a row deletion, add index metadata so mutable secondary indexing works
-                                    /* TODO: we currently manually run a scan to delete the index data here
-                                    ImmutableBytesWritable ptr = context.getTempPtr();
-                                    tableRef.getTable().getIndexMaintainers(ptr);
-                                    if (ptr.getLength() > 0) {
-                                        IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef);
-                                        cache = client.addIndexMetadataCache(context.getScanRanges(), ptr);
-                                        byte[] uuidValue = cache.getId();
-                                        scan.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
-                                    }
-                                    */
-                                } else {
-                                    // In the case of the empty key value column family changing, do not send the index
-                                    // metadata, as we're currently managing this from the client. It's possible for the
-                                    // data empty column family to stay the same, while the index empty column family
-                                    // changes.
-                                    PColumn column = deleteList.get(0);
-                                    byte[] cq = column.getColumnQualifierBytes();
-                                    if (emptyCF == null) {
-                                        scan.addColumn(column.getFamilyName().getBytes(), cq);
-                                    }
-                                    scan.setAttribute(BaseScannerRegionObserver.DELETE_CF, column.getFamilyName().getBytes());
-                                    scan.setAttribute(BaseScannerRegionObserver.DELETE_CQ, cq);
-                                }
-                            }
-                            List<byte[]> columnFamilies = Lists.newArrayListWithExpectedSize(tableRef.getTable().getColumnFamilies().size());
-                            if (projectCFs == null) {
-                                for (PColumnFamily family : tableRef.getTable().getColumnFamilies()) {
-                                    columnFamilies.add(family.getName().getBytes());
+        @Override
+        public ColumnRef resolveColumn(String schemaName, String tableName, String colName)
+                throws SQLException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public List<PFunction> getFunctions() {
+            return Collections.<PFunction>emptyList();
+        }
+
+        @Override
+        public PFunction resolveFunction(String functionName)
+            throws SQLException {
+            throw new FunctionNotFoundException(functionName);
+        }
+
+        @Override
+        public boolean hasUDFs() {
+            return false;
+        }
+
+        @Override
+        public PSchema resolveSchema(String schemaName) throws SQLException {
+            throw new SchemaNotFoundException(schemaName);
+        }
+
+        @Override
+        public List<PSchema> getSchemas() {
+            throw new UnsupportedOperationException();
+        }
+
+    }
+
+    private class PostDDLMutationPlan extends BaseMutationPlan {
+
+        private final StatementContext context;
+        private final List<TableRef> tableRefs;
+        private final long timestamp;
+        private final byte[] emptyCF;
+        private final List<PColumn> deleteList;
+        private final List<byte[]> projectCFs;
+
+        public PostDDLMutationPlan(StatementContext context, List<TableRef> tableRefs, long timestamp, byte[] emptyCF, List<PColumn> deleteList, List<byte[]> projectCFs) {
+            super(context, Operation.UPSERT);
+            this.context = context;
+            this.tableRefs = tableRefs;
+            this.timestamp = timestamp;
+            this.emptyCF = emptyCF;
+            this.deleteList = deleteList;
+            this.projectCFs = projectCFs;
+        }
+
+        @Override
+        public MutationState execute() throws SQLException {
+            if (tableRefs.isEmpty()) {
+                return new MutationState(0, 1000, connection);
+            }
+            boolean wasAutoCommit = connection.getAutoCommit();
+            try {
+                connection.setAutoCommit(true);
+                SQLException sqlE = null;
+                /*
+                 * Handles:
+                 * 1) deletion of all rows for a DROP TABLE and subsequently deletion of all rows for a DROP INDEX;
+                 * 2) deletion of all column values for a ALTER TABLE DROP COLUMN
+                 * 3) updating the necessary rows to have an empty KV
+                 * 4) updating table stats
+                 */
+                long totalMutationCount = 0;
+                for (final TableRef tableRef : tableRefs) {
+                    Scan scan = ScanUtil.newScan(context.getScan());
+                    SelectStatement select = SelectStatement.COUNT_ONE;
+                    // We need to use this tableRef
+                    ColumnResolver resolver = new SingleTableRefColumnResolver(tableRef);
+                    PhoenixStatement statement = new PhoenixStatement(connection);
+                    StatementContext context = new StatementContext(statement, resolver, scan, new SequenceManager(statement));
+                    long ts = timestamp;
+                    // FIXME: DDL operations aren't transactional, so we're basing the timestamp on a server timestamp.
+                    // Not sure what the fix should be. We don't need conflict detection nor filtering of invalid transactions
+                    // in this case, so maybe this is ok.
+                    if (ts!= HConstants.LATEST_TIMESTAMP && tableRef.getTable().isTransactional()) {
+                        ts = TransactionUtil.convertToNanoseconds(ts);
+                    }
+                    ScanUtil.setTimeRange(scan, scan.getTimeRange().getMin(), ts);
+                    if (emptyCF != null) {
+                        scan.setAttribute(BaseScannerRegionObserver.EMPTY_CF, emptyCF);
+                        scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER, EncodedColumnsUtil.getEmptyKeyValueInfo(tableRef.getTable()).getFirst());
+                    }
+                    ServerCache cache = null;
+                    try {
+                        if (deleteList != null) {
+                            if (deleteList.isEmpty()) {
+                                scan.setAttribute(BaseScannerRegionObserver.DELETE_AGG, QueryConstants.TRUE);
+                                // In the case of a row deletion, add index metadata so mutable secondary indexing works
+                                /* TODO: we currently manually run a scan to delete the index data here
+                                ImmutableBytesWritable ptr = context.getTempPtr();
+                                tableRef.getTable().getIndexMaintainers(ptr);
+                                if (ptr.getLength() > 0) {
+                                    IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef);
+                                    cache = client.addIndexMetadataCache(context.getScanRanges(), ptr);
+                                    byte[] uuidValue = cache.getId();
+                                    scan.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
                                 }
+                                */
                             } else {
-                                for (byte[] projectCF : projectCFs) {
-                                    columnFamilies.add(projectCF);
+                                // In the case of the empty key value column family changing, do not send the index
+                                // metadata, as we're currently managing this from the client. It's possible for the
+                                // data empty column family to stay the same, while the index empty column family
+                                // changes.
+                                PColumn column = deleteList.get(0);
+                                byte[] cq = column.getColumnQualifierBytes();
+                                if (emptyCF == null) {
+                                    scan.addColumn(column.getFamilyName().getBytes(), cq);
                                 }
+                                scan.setAttribute(BaseScannerRegionObserver.DELETE_CF, column.getFamilyName().getBytes());
+                                scan.setAttribute(BaseScannerRegionObserver.DELETE_CQ, cq);
                             }
-                            // Need to project all column families into the scan, since we haven't yet created our empty key value
-                            RowProjector projector = ProjectionCompiler.compile(context, SelectStatement.COUNT_ONE, GroupBy.EMPTY_GROUP_BY);
-                            context.getAggregationManager().compile(context, GroupBy.EMPTY_GROUP_BY);
-                            // Explicitly project these column families and don't project the empty key value,
-                            // since at this point we haven't added the empty key value everywhere.
-                            if (columnFamilies != null) {
-                                scan.getFamilyMap().clear();
-                                for (byte[] family : columnFamilies) {
-                                    scan.addFamily(family);
-                                }
-                                projector = new RowProjector(projector,false);
+                        }
+                        List<byte[]> columnFamilies = Lists.newArrayListWithExpectedSize(tableRef.getTable().getColumnFamilies().size());
+                        if (projectCFs == null) {
+                            for (PColumnFamily family : tableRef.getTable().getColumnFamilies()) {
+                                columnFamilies.add(family.getName().getBytes());
                             }
-                            // Ignore exceptions due to not being able to resolve any view columns,
-                            // as this just means the view is invalid. Continue on and try to perform
-                            // any other Post DDL operations.
-                            try {
-                                // Since dropping a VIEW does not affect the underlying data, we do
-                                // not need to pass through the view statement here.
-                                WhereCompiler.compile(context, select); // Push where clause into scan
-                            } catch (ColumnFamilyNotFoundException e) {
-                                continue;
-                            } catch (ColumnNotFoundException e) {
-                                continue;
-                            } catch (AmbiguousColumnException e) {
-                                continue;
+                        } else {
+                            for (byte[] projectCF : projectCFs) {
+                                columnFamilies.add(projectCF);
+                            }
+                        }
+                        // Need to project all column families into the scan, since we haven't yet created our empty key value
+                        RowProjector projector = ProjectionCompiler.compile(context, SelectStatement.COUNT_ONE, GroupBy.EMPTY_GROUP_BY);
+                        context.getAggregationManager().compile(context, GroupBy.EMPTY_GROUP_BY);
+                        // Explicitly project these column families and don't project the empty key value,
+                        // since at this point we haven't added the empty key value everywhere.
+                        if (columnFamilies != null) {
+                            scan.getFamilyMap().clear();
+                            for (byte[] family : columnFamilies) {
+                                scan.addFamily(family);
                             }
-                            QueryPlan plan = new AggregatePlan(context, select, tableRef, projector, null, null,
-                                    OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null, null);
+                            projector = new RowProjector(projector,false);
+                        }
+                        // Ignore exceptions due to not being able to resolve any view columns,
+                        // as this just means the view is invalid. Continue on and try to perform
+                        // any other Post DDL operations.
+                        try {
+                            // Since dropping a VIEW does not affect the underlying data, we do
+                            // not need to pass through the view statement here.
+                            WhereCompiler.compile(context, select); // Push where clause into scan
+                        } catch (ColumnFamilyNotFoundException e) {
+                            continue;
+                        } catch (ColumnNotFoundException e) {
+                            continue;
+                        } catch (AmbiguousColumnException e) {
+                            continue;
+                        }
+                        QueryPlan plan = new AggregatePlan(context, select, tableRef, projector, null, null,
+                                OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null, null);
+                        try {
+                            ResultIterator iterator = plan.iterator();
                             try {
-                                ResultIterator iterator = plan.iterator();
+                                Tuple row = iterator.next();
+                                ImmutableBytesWritable ptr = context.getTempPtr();
+                                totalMutationCount += (Long)projector.getColumnProjector(0).getValue(row, PLong.INSTANCE, ptr);
+                            } catch (SQLException e) {
+                                sqlE = e;
+                            } finally {
                                 try {
-                                    Tuple row = iterator.next();
-                                    ImmutableBytesWritable ptr = context.getTempPtr();
-                                    totalMutationCount += (Long)projector.getColumnProjector(0).getValue(row, PLong.INSTANCE, ptr);
+                                    iterator.close();
                                 } catch (SQLException e) {
-                                    sqlE = e;
+                                    if (sqlE == null) {
+                                        sqlE = e;
+                                    } else {
+                                        sqlE.setNextException(e);
+                                    }
                                 } finally {
-                                    try {
-                                        iterator.close();
-                                    } catch (SQLException e) {
-                                        if (sqlE == null) {
-                                            sqlE = e;
-                                        } else {
-                                            sqlE.setNextException(e);
-                                        }
-                                    } finally {
-                                        if (sqlE != null) {
-                                            throw sqlE;
-                                        }
+                                    if (sqlE != null) {
+                                        throw sqlE;
                                     }
                                 }
-                            } catch (TableNotFoundException e) {
-                                // Ignore and continue, as HBase throws when table hasn't been written to
-                                // FIXME: Remove if this is fixed in 0.96
-                            }
-                        } finally {
-                            if (cache != null) { // Remove server cache if there is one
-                                cache.close();
                             }
+                        } catch (TableNotFoundException e) {
+                            // Ignore and continue, as HBase throws when table hasn't been written to
+                            // FIXME: Remove if this is fixed in 0.96
                         }
-                        
-                    }
-                    final long count = totalMutationCount;
-                    return new MutationState(1, 1000, connection) {
-                        @Override
-                        public long getUpdateCount() {
-                            return count;
+                    } finally {
+                        if (cache != null) { // Remove server cache if there is one
+                            cache.close();
                         }
-                    };
-                } finally {
-                    if (!wasAutoCommit) connection.setAutoCommit(wasAutoCommit);
+                    }
+
                 }
+                final long count = totalMutationCount;
+                return new MutationState(1, 1000, connection) {
+                    @Override
+                    public long getUpdateCount() {
+                        return count;
+                    }
+                };
+            } finally {
+                if (!wasAutoCommit) connection.setAutoCommit(wasAutoCommit);
+            }
+        }
+
+        private class SingleTableRefColumnResolver implements ColumnResolver {
+            private final TableRef tableRef;
+
+            public SingleTableRefColumnResolver(TableRef tableRef) {
+                this.tableRef = tableRef;
+            }
+
+            @Override
+            public List<TableRef> getTables() {
+                return Collections.singletonList(tableRef);
+            }
+
+            @Override
+            public List<PFunction> getFunctions() {
+                return Collections.emptyList();
+            }
+
+            ;
+
+            @Override
+            public TableRef resolveTable(String schemaName, String tableName)
+                    throws SQLException {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public ColumnRef resolveColumn(String schemaName, String tableName, String colName) throws SQLException {
+                PColumn column = tableName != null
+                        ? tableRef.getTable().getColumnFamily(tableName).getPColumnForColumnName(colName)
+                        : tableRef.getTable().getColumnForColumnName(colName);
+                return new ColumnRef(tableRef, column.getPosition());
+            }
+
+            @Override
+            public PFunction resolveFunction(String functionName) throws SQLException {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public boolean hasUDFs() {
+                return false;
+            }
+
+            @Override
+            public List<PSchema> getSchemas() {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public PSchema resolveSchema(String schemaName) throws SQLException {
+                throw new SchemaNotFoundException(schemaName);
             }
-        };
+        }
     }
 }
\ No newline at end of file