You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2018/11/17 23:45:45 UTC
[4/9] phoenix git commit: PHOENIX-5024 - Cleanup anonymous inner
classes in PostDDLCompiler
PHOENIX-5024 - Cleanup anonymous inner classes in PostDDLCompiler
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b493797f
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b493797f
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b493797f
Branch: refs/heads/omid2
Commit: b493797f67a133a94f83546f29d5a42c0e4e6835
Parents: 89bbdfd
Author: Geoffrey Jacoby <gj...@apache.org>
Authored: Fri Nov 16 09:55:49 2018 -0800
Committer: Geoffrey Jacoby <gj...@apache.org>
Committed: Fri Nov 16 11:03:26 2018 -0800
----------------------------------------------------------------------
.../apache/phoenix/compile/PostDDLCompiler.java | 478 ++++++++++---------
1 file changed, 258 insertions(+), 220 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b493797f/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
index 709534e..a74c5f1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
@@ -87,248 +87,286 @@ public class PostDDLCompiler {
final long timestamp) throws SQLException {
PhoenixStatement statement = new PhoenixStatement(connection);
final StatementContext context = new StatementContext(
- statement,
- new ColumnResolver() {
+ statement,
+ new MultipleTableRefColumnResolver(tableRefs),
+ scan,
+ new SequenceManager(statement));
+ return new PostDDLMutationPlan(context, tableRefs, timestamp, emptyCF, deleteList, projectCFs);
+ }
- @Override
- public List<TableRef> getTables() {
- return tableRefs;
- }
+ private static class MultipleTableRefColumnResolver implements ColumnResolver {
- @Override
- public TableRef resolveTable(String schemaName, String tableName) throws SQLException {
- throw new UnsupportedOperationException();
- }
+ private final List<TableRef> tableRefs;
- @Override
- public ColumnRef resolveColumn(String schemaName, String tableName, String colName)
- throws SQLException {
- throw new UnsupportedOperationException();
- }
+ public MultipleTableRefColumnResolver(List<TableRef> tableRefs) {
+ this.tableRefs = tableRefs;
+ }
- @Override
- public List<PFunction> getFunctions() {
- return Collections.<PFunction>emptyList();
- }
-
- @Override
- public PFunction resolveFunction(String functionName)
- throws SQLException {
- throw new FunctionNotFoundException(functionName);
- }
-
- @Override
- public boolean hasUDFs() {
- return false;
- }
-
- @Override
- public PSchema resolveSchema(String schemaName) throws SQLException {
- throw new SchemaNotFoundException(schemaName);
- }
-
- @Override
- public List<PSchema> getSchemas() {
- throw new UnsupportedOperationException();
- }
-
- },
- scan,
- new SequenceManager(statement));
- return new BaseMutationPlan(context, Operation.UPSERT /* FIXME */) {
-
- @Override
- public MutationState execute() throws SQLException {
- if (tableRefs.isEmpty()) {
- return new MutationState(0, 1000, connection);
- }
- boolean wasAutoCommit = connection.getAutoCommit();
- try {
- connection.setAutoCommit(true);
- SQLException sqlE = null;
- /*
- * Handles:
- * 1) deletion of all rows for a DROP TABLE and subsequently deletion of all rows for a DROP INDEX;
- * 2) deletion of all column values for a ALTER TABLE DROP COLUMN
- * 3) updating the necessary rows to have an empty KV
- * 4) updating table stats
- */
- long totalMutationCount = 0;
- for (final TableRef tableRef : tableRefs) {
- Scan scan = ScanUtil.newScan(context.getScan());
- SelectStatement select = SelectStatement.COUNT_ONE;
- // We need to use this tableRef
- ColumnResolver resolver = new ColumnResolver() {
- @Override
- public List<TableRef> getTables() {
- return Collections.singletonList(tableRef);
- }
-
- @Override
- public java.util.List<PFunction> getFunctions() {
- return Collections.emptyList();
- };
-
- @Override
- public TableRef resolveTable(String schemaName, String tableName)
- throws SQLException {
- throw new UnsupportedOperationException();
- }
- @Override
- public ColumnRef resolveColumn(String schemaName, String tableName, String colName) throws SQLException {
- PColumn column = tableName != null
- ? tableRef.getTable().getColumnFamily(tableName).getPColumnForColumnName(colName)
- : tableRef.getTable().getColumnForColumnName(colName);
- return new ColumnRef(tableRef, column.getPosition());
- }
-
- @Override
- public PFunction resolveFunction(String functionName) throws SQLException {
- throw new UnsupportedOperationException();
- };
-
- @Override
- public boolean hasUDFs() {
- return false;
- }
+ @Override
+ public List<TableRef> getTables() {
+ return tableRefs;
+ }
- @Override
- public List<PSchema> getSchemas() {
- throw new UnsupportedOperationException();
- }
+ @Override
+ public TableRef resolveTable(String schemaName, String tableName) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
- @Override
- public PSchema resolveSchema(String schemaName) throws SQLException {
- throw new SchemaNotFoundException(schemaName);
- }
- };
- PhoenixStatement statement = new PhoenixStatement(connection);
- StatementContext context = new StatementContext(statement, resolver, scan, new SequenceManager(statement));
- long ts = timestamp;
- // FIXME: DDL operations aren't transactional, so we're basing the timestamp on a server timestamp.
- // Not sure what the fix should be. We don't need conflict detection nor filtering of invalid transactions
- // in this case, so maybe this is ok.
- if (ts!=HConstants.LATEST_TIMESTAMP && tableRef.getTable().isTransactional()) {
- ts = TransactionUtil.convertToNanoseconds(ts);
- }
- ScanUtil.setTimeRange(scan, scan.getTimeRange().getMin(), ts);
- if (emptyCF != null) {
- scan.setAttribute(BaseScannerRegionObserver.EMPTY_CF, emptyCF);
- scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER, EncodedColumnsUtil.getEmptyKeyValueInfo(tableRef.getTable()).getFirst());
- }
- ServerCache cache = null;
- try {
- if (deleteList != null) {
- if (deleteList.isEmpty()) {
- scan.setAttribute(BaseScannerRegionObserver.DELETE_AGG, QueryConstants.TRUE);
- // In the case of a row deletion, add index metadata so mutable secondary indexing works
- /* TODO: we currently manually run a scan to delete the index data here
- ImmutableBytesWritable ptr = context.getTempPtr();
- tableRef.getTable().getIndexMaintainers(ptr);
- if (ptr.getLength() > 0) {
- IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef);
- cache = client.addIndexMetadataCache(context.getScanRanges(), ptr);
- byte[] uuidValue = cache.getId();
- scan.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
- }
- */
- } else {
- // In the case of the empty key value column family changing, do not send the index
- // metadata, as we're currently managing this from the client. It's possible for the
- // data empty column family to stay the same, while the index empty column family
- // changes.
- PColumn column = deleteList.get(0);
- byte[] cq = column.getColumnQualifierBytes();
- if (emptyCF == null) {
- scan.addColumn(column.getFamilyName().getBytes(), cq);
- }
- scan.setAttribute(BaseScannerRegionObserver.DELETE_CF, column.getFamilyName().getBytes());
- scan.setAttribute(BaseScannerRegionObserver.DELETE_CQ, cq);
- }
- }
- List<byte[]> columnFamilies = Lists.newArrayListWithExpectedSize(tableRef.getTable().getColumnFamilies().size());
- if (projectCFs == null) {
- for (PColumnFamily family : tableRef.getTable().getColumnFamilies()) {
- columnFamilies.add(family.getName().getBytes());
+ @Override
+ public ColumnRef resolveColumn(String schemaName, String tableName, String colName)
+ throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<PFunction> getFunctions() {
+ return Collections.<PFunction>emptyList();
+ }
+
+ @Override
+ public PFunction resolveFunction(String functionName)
+ throws SQLException {
+ throw new FunctionNotFoundException(functionName);
+ }
+
+ @Override
+ public boolean hasUDFs() {
+ return false;
+ }
+
+ @Override
+ public PSchema resolveSchema(String schemaName) throws SQLException {
+ throw new SchemaNotFoundException(schemaName);
+ }
+
+ @Override
+ public List<PSchema> getSchemas() {
+ throw new UnsupportedOperationException();
+ }
+
+ }
+
+ private class PostDDLMutationPlan extends BaseMutationPlan {
+
+ private final StatementContext context;
+ private final List<TableRef> tableRefs;
+ private final long timestamp;
+ private final byte[] emptyCF;
+ private final List<PColumn> deleteList;
+ private final List<byte[]> projectCFs;
+
+ public PostDDLMutationPlan(StatementContext context, List<TableRef> tableRefs, long timestamp, byte[] emptyCF, List<PColumn> deleteList, List<byte[]> projectCFs) {
+ super(context, Operation.UPSERT);
+ this.context = context;
+ this.tableRefs = tableRefs;
+ this.timestamp = timestamp;
+ this.emptyCF = emptyCF;
+ this.deleteList = deleteList;
+ this.projectCFs = projectCFs;
+ }
+
+ @Override
+ public MutationState execute() throws SQLException {
+ if (tableRefs.isEmpty()) {
+ return new MutationState(0, 1000, connection);
+ }
+ boolean wasAutoCommit = connection.getAutoCommit();
+ try {
+ connection.setAutoCommit(true);
+ SQLException sqlE = null;
+ /*
+ * Handles:
+ * 1) deletion of all rows for a DROP TABLE and subsequently deletion of all rows for a DROP INDEX;
+ * 2) deletion of all column values for a ALTER TABLE DROP COLUMN
+ * 3) updating the necessary rows to have an empty KV
+ * 4) updating table stats
+ */
+ long totalMutationCount = 0;
+ for (final TableRef tableRef : tableRefs) {
+ Scan scan = ScanUtil.newScan(context.getScan());
+ SelectStatement select = SelectStatement.COUNT_ONE;
+ // We need to use this tableRef
+ ColumnResolver resolver = new SingleTableRefColumnResolver(tableRef);
+ PhoenixStatement statement = new PhoenixStatement(connection);
+ StatementContext context = new StatementContext(statement, resolver, scan, new SequenceManager(statement));
+ long ts = timestamp;
+ // FIXME: DDL operations aren't transactional, so we're basing the timestamp on a server timestamp.
+ // Not sure what the fix should be. We don't need conflict detection nor filtering of invalid transactions
+ // in this case, so maybe this is ok.
+ if (ts!= HConstants.LATEST_TIMESTAMP && tableRef.getTable().isTransactional()) {
+ ts = TransactionUtil.convertToNanoseconds(ts);
+ }
+ ScanUtil.setTimeRange(scan, scan.getTimeRange().getMin(), ts);
+ if (emptyCF != null) {
+ scan.setAttribute(BaseScannerRegionObserver.EMPTY_CF, emptyCF);
+ scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER, EncodedColumnsUtil.getEmptyKeyValueInfo(tableRef.getTable()).getFirst());
+ }
+ ServerCache cache = null;
+ try {
+ if (deleteList != null) {
+ if (deleteList.isEmpty()) {
+ scan.setAttribute(BaseScannerRegionObserver.DELETE_AGG, QueryConstants.TRUE);
+ // In the case of a row deletion, add index metadata so mutable secondary indexing works
+ /* TODO: we currently manually run a scan to delete the index data here
+ ImmutableBytesWritable ptr = context.getTempPtr();
+ tableRef.getTable().getIndexMaintainers(ptr);
+ if (ptr.getLength() > 0) {
+ IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef);
+ cache = client.addIndexMetadataCache(context.getScanRanges(), ptr);
+ byte[] uuidValue = cache.getId();
+ scan.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
}
+ */
} else {
- for (byte[] projectCF : projectCFs) {
- columnFamilies.add(projectCF);
+ // In the case of the empty key value column family changing, do not send the index
+ // metadata, as we're currently managing this from the client. It's possible for the
+ // data empty column family to stay the same, while the index empty column family
+ // changes.
+ PColumn column = deleteList.get(0);
+ byte[] cq = column.getColumnQualifierBytes();
+ if (emptyCF == null) {
+ scan.addColumn(column.getFamilyName().getBytes(), cq);
}
+ scan.setAttribute(BaseScannerRegionObserver.DELETE_CF, column.getFamilyName().getBytes());
+ scan.setAttribute(BaseScannerRegionObserver.DELETE_CQ, cq);
}
- // Need to project all column families into the scan, since we haven't yet created our empty key value
- RowProjector projector = ProjectionCompiler.compile(context, SelectStatement.COUNT_ONE, GroupBy.EMPTY_GROUP_BY);
- context.getAggregationManager().compile(context, GroupBy.EMPTY_GROUP_BY);
- // Explicitly project these column families and don't project the empty key value,
- // since at this point we haven't added the empty key value everywhere.
- if (columnFamilies != null) {
- scan.getFamilyMap().clear();
- for (byte[] family : columnFamilies) {
- scan.addFamily(family);
- }
- projector = new RowProjector(projector,false);
+ }
+ List<byte[]> columnFamilies = Lists.newArrayListWithExpectedSize(tableRef.getTable().getColumnFamilies().size());
+ if (projectCFs == null) {
+ for (PColumnFamily family : tableRef.getTable().getColumnFamilies()) {
+ columnFamilies.add(family.getName().getBytes());
}
- // Ignore exceptions due to not being able to resolve any view columns,
- // as this just means the view is invalid. Continue on and try to perform
- // any other Post DDL operations.
- try {
- // Since dropping a VIEW does not affect the underlying data, we do
- // not need to pass through the view statement here.
- WhereCompiler.compile(context, select); // Push where clause into scan
- } catch (ColumnFamilyNotFoundException e) {
- continue;
- } catch (ColumnNotFoundException e) {
- continue;
- } catch (AmbiguousColumnException e) {
- continue;
+ } else {
+ for (byte[] projectCF : projectCFs) {
+ columnFamilies.add(projectCF);
+ }
+ }
+ // Need to project all column families into the scan, since we haven't yet created our empty key value
+ RowProjector projector = ProjectionCompiler.compile(context, SelectStatement.COUNT_ONE, GroupBy.EMPTY_GROUP_BY);
+ context.getAggregationManager().compile(context, GroupBy.EMPTY_GROUP_BY);
+ // Explicitly project these column families and don't project the empty key value,
+ // since at this point we haven't added the empty key value everywhere.
+ if (columnFamilies != null) {
+ scan.getFamilyMap().clear();
+ for (byte[] family : columnFamilies) {
+ scan.addFamily(family);
}
- QueryPlan plan = new AggregatePlan(context, select, tableRef, projector, null, null,
- OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null, null);
+ projector = new RowProjector(projector,false);
+ }
+ // Ignore exceptions due to not being able to resolve any view columns,
+ // as this just means the view is invalid. Continue on and try to perform
+ // any other Post DDL operations.
+ try {
+ // Since dropping a VIEW does not affect the underlying data, we do
+ // not need to pass through the view statement here.
+ WhereCompiler.compile(context, select); // Push where clause into scan
+ } catch (ColumnFamilyNotFoundException e) {
+ continue;
+ } catch (ColumnNotFoundException e) {
+ continue;
+ } catch (AmbiguousColumnException e) {
+ continue;
+ }
+ QueryPlan plan = new AggregatePlan(context, select, tableRef, projector, null, null,
+ OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null, null);
+ try {
+ ResultIterator iterator = plan.iterator();
try {
- ResultIterator iterator = plan.iterator();
+ Tuple row = iterator.next();
+ ImmutableBytesWritable ptr = context.getTempPtr();
+ totalMutationCount += (Long)projector.getColumnProjector(0).getValue(row, PLong.INSTANCE, ptr);
+ } catch (SQLException e) {
+ sqlE = e;
+ } finally {
try {
- Tuple row = iterator.next();
- ImmutableBytesWritable ptr = context.getTempPtr();
- totalMutationCount += (Long)projector.getColumnProjector(0).getValue(row, PLong.INSTANCE, ptr);
+ iterator.close();
} catch (SQLException e) {
- sqlE = e;
+ if (sqlE == null) {
+ sqlE = e;
+ } else {
+ sqlE.setNextException(e);
+ }
} finally {
- try {
- iterator.close();
- } catch (SQLException e) {
- if (sqlE == null) {
- sqlE = e;
- } else {
- sqlE.setNextException(e);
- }
- } finally {
- if (sqlE != null) {
- throw sqlE;
- }
+ if (sqlE != null) {
+ throw sqlE;
}
}
- } catch (TableNotFoundException e) {
- // Ignore and continue, as HBase throws when table hasn't been written to
- // FIXME: Remove if this is fixed in 0.96
- }
- } finally {
- if (cache != null) { // Remove server cache if there is one
- cache.close();
}
+ } catch (TableNotFoundException e) {
+ // Ignore and continue, as HBase throws when table hasn't been written to
+ // FIXME: Remove if this is fixed in 0.96
}
-
- }
- final long count = totalMutationCount;
- return new MutationState(1, 1000, connection) {
- @Override
- public long getUpdateCount() {
- return count;
+ } finally {
+ if (cache != null) { // Remove server cache if there is one
+ cache.close();
}
- };
- } finally {
- if (!wasAutoCommit) connection.setAutoCommit(wasAutoCommit);
+ }
+
}
+ final long count = totalMutationCount;
+ return new MutationState(1, 1000, connection) {
+ @Override
+ public long getUpdateCount() {
+ return count;
+ }
+ };
+ } finally {
+ if (!wasAutoCommit) connection.setAutoCommit(wasAutoCommit);
+ }
+ }
+
+ private class SingleTableRefColumnResolver implements ColumnResolver {
+ private final TableRef tableRef;
+
+ public SingleTableRefColumnResolver(TableRef tableRef) {
+ this.tableRef = tableRef;
+ }
+
+ @Override
+ public List<TableRef> getTables() {
+ return Collections.singletonList(tableRef);
+ }
+
+ @Override
+ public List<PFunction> getFunctions() {
+ return Collections.emptyList();
+ }
+
+ ;
+
+ @Override
+ public TableRef resolveTable(String schemaName, String tableName)
+ throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ColumnRef resolveColumn(String schemaName, String tableName, String colName) throws SQLException {
+ PColumn column = tableName != null
+ ? tableRef.getTable().getColumnFamily(tableName).getPColumnForColumnName(colName)
+ : tableRef.getTable().getColumnForColumnName(colName);
+ return new ColumnRef(tableRef, column.getPosition());
+ }
+
+ @Override
+ public PFunction resolveFunction(String functionName) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean hasUDFs() {
+ return false;
+ }
+
+ @Override
+ public List<PSchema> getSchemas() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public PSchema resolveSchema(String schemaName) throws SQLException {
+ throw new SchemaNotFoundException(schemaName);
}
- };
+ }
}
}
\ No newline at end of file