You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/08/20 08:30:39 UTC
[1/3] git commit: PHOENIX-1181 client cache fails to update itself
after a table was altered from a diff client
Repository: phoenix
Updated Branches:
refs/heads/3.0 91e5d3a1e -> 5ca432b2d
PHOENIX-1181 client cache fails to update itself after a table was altered from a diff client
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/42465628
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/42465628
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/42465628
Branch: refs/heads/3.0
Commit: 42465628683aa933805e3810fe1a299167344337
Parents: 7a61571
Author: James Taylor <jt...@salesforce.com>
Authored: Tue Aug 19 00:49:22 2014 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Tue Aug 19 02:35:59 2014 -0700
----------------------------------------------------------------------
.../apache/phoenix/end2end/AlterTableIT.java | 31 +-
.../phoenix/compile/CreateIndexCompiler.java | 2 +-
.../apache/phoenix/compile/DeleteCompiler.java | 93 ++++--
.../apache/phoenix/compile/FromCompiler.java | 19 +-
.../apache/phoenix/compile/UpsertCompiler.java | 287 +++++++++++--------
.../apache/phoenix/execute/MutationState.java | 7 +-
.../org/apache/phoenix/parse/DMLStatement.java | 27 ++
.../apache/phoenix/parse/DeleteStatement.java | 2 +-
.../apache/phoenix/parse/UpsertStatement.java | 2 +-
.../apache/phoenix/schema/MetaDataClient.java | 8 +-
10 files changed, 317 insertions(+), 161 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/42465628/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
index 2af697b..ecdee66 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
@@ -837,4 +837,33 @@ public class AlterTableIT extends BaseHBaseManagedTimeIT {
}
}
- }
+ @Test
+ public void alterTableFromDifferentClient() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn3 = DriverManager.getConnection(getUrl(), props);
+
+ // here we insert into the orig schema with one column
+ Connection conn1 = DriverManager.getConnection(getUrl(), props);
+ conn1.createStatement().execute("create table test_simpletable(id VARCHAR PRIMARY KEY, field1 BIGINT)");
+ PreparedStatement stmtInsert1 = conn1.prepareStatement("upsert into test_simpletable (id, field1) values ( ?, ?)");
+ stmtInsert1.setString(1, "key1");
+ stmtInsert1.setLong(2, 1L);
+ stmtInsert1.execute();
+ conn1.commit();
+ stmtInsert1.close();
+ conn1.close();
+
+ // Do the alter through a separate client.
+ conn3.createStatement().execute("alter table test_simpletable add field2 BIGINT");
+
+ //Connection conn1 = DriverManager.getConnection(getUrl(), props);
+ PreparedStatement pstmt2 = conn1.prepareStatement("upsert into test_simpletable (id, field1, field2) values ( ?, ?, ?)");
+ pstmt2.setString(1, "key2");
+ pstmt2.setLong(2, 2L);
+ pstmt2.setLong(3, 2L);
+ pstmt2.execute();
+ conn1.commit();
+ pstmt2.close();
+ conn1.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/42465628/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java
index bbd7154..d6595ff 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java
@@ -42,7 +42,7 @@ public class CreateIndexCompiler {
public MutationPlan compile(final CreateIndexStatement create) throws SQLException {
final PhoenixConnection connection = statement.getConnection();
- final ColumnResolver resolver = FromCompiler.getResolverForMutation(create, connection);
+ final ColumnResolver resolver = FromCompiler.getResolver(create, connection);
Scan scan = new Scan();
final StatementContext context = new StatementContext(statement, resolver, scan, new SequenceManager(statement));
ExpressionCompiler expressionCompiler = new ExpressionCompiler(context);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/42465628/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index 002f7b8..469bb30 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -33,6 +33,7 @@ import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.execute.AggregatePlan;
@@ -50,6 +51,7 @@ import org.apache.phoenix.parse.AliasedNode;
import org.apache.phoenix.parse.DeleteStatement;
import org.apache.phoenix.parse.HintNode;
import org.apache.phoenix.parse.HintNode.Hint;
+import org.apache.phoenix.parse.NamedTableNode;
import org.apache.phoenix.parse.ParseNode;
import org.apache.phoenix.parse.ParseNodeFactory;
import org.apache.phoenix.parse.SelectStatement;
@@ -58,6 +60,8 @@ import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.MetaDataClient;
+import org.apache.phoenix.schema.MetaDataEntityNotFoundException;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PDataType;
import org.apache.phoenix.schema.PRow;
@@ -179,39 +183,68 @@ public class DeleteCompiler {
public MutationPlan compile(DeleteStatement delete) throws SQLException {
final PhoenixConnection connection = statement.getConnection();
final boolean isAutoCommit = connection.getAutoCommit();
+ final boolean hasLimit = delete.getLimit() != null;
final ConnectionQueryServices services = connection.getQueryServices();
- final ColumnResolver resolver = FromCompiler.getResolverForMutation(delete, connection);
- final TableRef tableRef = resolver.getTables().get(0);
- final PTable table = tableRef.getTable();
- if (table.getType() == PTableType.VIEW && table.getViewType().isReadOnly()) {
- throw new ReadOnlyTableException(table.getSchemaName().getString(),table.getTableName().getString());
- }
+ QueryPlan planToBe = null;
+ NamedTableNode tableNode = delete.getTable();
+ String tableName = tableNode.getName().getTableName();
+ String schemaName = tableNode.getName().getSchemaName();
+ boolean retryOnce = !isAutoCommit;
+ TableRef tableRefToBe;
+ boolean noQueryReqd = false;
+ boolean runOnServer = false;
+ SelectStatement select = null;
+ DeletingParallelIteratorFactory parallelIteratorFactory = null;
+ while (true) {
+ try {
+ ColumnResolver resolver = FromCompiler.getResolverForMutation(delete, connection);
+ tableRefToBe = resolver.getTables().get(0);
+ PTable table = tableRefToBe.getTable();
+ if (table.getType() == PTableType.VIEW && table.getViewType().isReadOnly()) {
+ throw new ReadOnlyTableException(table.getSchemaName().getString(),table.getTableName().getString());
+ }
+
+ noQueryReqd = !hasLimit && !hasImmutableIndex(tableRefToBe);
+ runOnServer = isAutoCommit && noQueryReqd;
+ HintNode hint = delete.getHint();
+ if (runOnServer && !delete.getHint().hasHint(Hint.USE_INDEX_OVER_DATA_TABLE)) {
+ hint = HintNode.create(hint, Hint.USE_DATA_OVER_INDEX_TABLE);
+ }
- final boolean hasLimit = delete.getLimit() != null;
- boolean noQueryReqd = !hasLimit && !hasImmutableIndex(tableRef);
- boolean runOnServer = isAutoCommit && noQueryReqd;
- HintNode hint = delete.getHint();
- if (runOnServer && !delete.getHint().hasHint(Hint.USE_INDEX_OVER_DATA_TABLE)) {
- hint = HintNode.create(hint, Hint.USE_DATA_OVER_INDEX_TABLE);
- }
-
- List<AliasedNode> aliasedNodes = Lists.newArrayListWithExpectedSize(table.getPKColumns().size());
- boolean isSalted = table.getBucketNum() != null;
- boolean isMultiTenant = connection.getTenantId() != null && table.isMultiTenant();
- boolean isSharedViewIndex = table.getViewIndexId() != null;
- for (int i = (isSalted ? 1 : 0) + (isMultiTenant ? 1 : 0) + (isSharedViewIndex ? 1 : 0); i < table.getPKColumns().size(); i++) {
- PColumn column = table.getPKColumns().get(i);
- aliasedNodes.add(FACTORY.aliasedNode(null, FACTORY.column(null, '"' + column.getName().getString() + '"', null)));
+ List<AliasedNode> aliasedNodes = Lists.newArrayListWithExpectedSize(table.getPKColumns().size());
+ boolean isSalted = table.getBucketNum() != null;
+ boolean isMultiTenant = connection.getTenantId() != null && table.isMultiTenant();
+ boolean isSharedViewIndex = table.getViewIndexId() != null;
+ for (int i = (isSalted ? 1 : 0) + (isMultiTenant ? 1 : 0) + (isSharedViewIndex ? 1 : 0); i < table.getPKColumns().size(); i++) {
+ PColumn column = table.getPKColumns().get(i);
+ aliasedNodes.add(FACTORY.aliasedNode(null, FACTORY.column(null, '"' + column.getName().getString() + '"', null)));
+ }
+ select = FACTORY.select(
+ Collections.singletonList(delete.getTable()),
+ hint, false, aliasedNodes, delete.getWhere(),
+ Collections.<ParseNode>emptyList(), null,
+ delete.getOrderBy(), delete.getLimit(),
+ delete.getBindCount(), false, false);
+ select = StatementNormalizer.normalize(select, resolver);
+ parallelIteratorFactory = hasLimit ? null : new DeletingParallelIteratorFactory(connection, tableRefToBe);
+ planToBe = new QueryOptimizer(services).optimize(statement, select, resolver, Collections.<PColumn>emptyList(), parallelIteratorFactory);
+ } catch (MetaDataEntityNotFoundException e) {
+ // Catch column/column family not found exception, as our meta data may
+ // be out of sync. Update the cache once and retry if we were out of sync.
+ // Otherwise throw, as we'll just get the same error next time.
+ if (retryOnce) {
+ retryOnce = false;
+ MetaDataMutationResult result = new MetaDataClient(connection).updateCache(schemaName, tableName);
+ if (result.wasUpdated()) {
+ continue;
+ }
+ }
+ throw e;
+ }
+ break;
}
- SelectStatement select = FACTORY.select(
- Collections.singletonList(delete.getTable()),
- hint, false, aliasedNodes, delete.getWhere(),
- Collections.<ParseNode>emptyList(), null,
- delete.getOrderBy(), delete.getLimit(),
- delete.getBindCount(), false, false);
- select = StatementNormalizer.normalize(select, resolver);
- DeletingParallelIteratorFactory parallelIteratorFactory = hasLimit ? null : new DeletingParallelIteratorFactory(connection, tableRef);
- final QueryPlan plan = new QueryOptimizer(services).optimize(statement, select, resolver, Collections.<PColumn>emptyList(), parallelIteratorFactory);
+ final TableRef tableRef = tableRefToBe;
+ final QueryPlan plan = planToBe;
if (!plan.getTableRef().equals(tableRef)) {
runOnServer = false;
noQueryReqd = false;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/42465628/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
index b9e0949..02d4f99 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
@@ -33,6 +33,7 @@ import org.apache.phoenix.parse.AliasedNode;
import org.apache.phoenix.parse.BindTableNode;
import org.apache.phoenix.parse.ColumnDef;
import org.apache.phoenix.parse.CreateTableStatement;
+import org.apache.phoenix.parse.DMLStatement;
import org.apache.phoenix.parse.DerivedTableNode;
import org.apache.phoenix.parse.FamilyWildcardParseNode;
import org.apache.phoenix.parse.JoinTableNode;
@@ -168,12 +169,26 @@ public class FromCompiler {
return visitor;
}
- public static ColumnResolver getResolverForMutation(SingleTableStatement statement, PhoenixConnection connection)
+ public static ColumnResolver getResolver(SingleTableStatement statement, PhoenixConnection connection)
throws SQLException {
- SingleTableColumnResolver visitor = new SingleTableColumnResolver(connection, statement.getTable(), false);
+ SingleTableColumnResolver visitor = new SingleTableColumnResolver(connection, statement.getTable(), true);
return visitor;
}
+
+ public static ColumnResolver getResolverForMutation(DMLStatement statement, PhoenixConnection connection)
+ throws SQLException {
+ return getResolverForMutation(statement, connection, false);
+ }
+ public static ColumnResolver getResolverForMutation(DMLStatement statement, PhoenixConnection connection, boolean updateCacheImmediately)
+ throws SQLException {
+ /*
+ * We validate the meta data at commit time for mutations, as this allows us to do many UPSERT VALUES calls
+ * without hitting the server each time to check if the meta data is up-to-date.
+ */
+ SingleTableColumnResolver visitor = new SingleTableColumnResolver(connection, statement.getTable(), updateCacheImmediately);
+ return visitor;
+ }
private static class SingleTableColumnResolver extends BaseColumnResolver {
private final List<TableRef> tableRefs;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/42465628/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 52cf7d3..d625a9d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -37,6 +37,7 @@ import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
@@ -59,6 +60,7 @@ import org.apache.phoenix.parse.ColumnName;
import org.apache.phoenix.parse.HintNode;
import org.apache.phoenix.parse.HintNode.Hint;
import org.apache.phoenix.parse.LiteralParseNode;
+import org.apache.phoenix.parse.NamedTableNode;
import org.apache.phoenix.parse.ParseNode;
import org.apache.phoenix.parse.SelectStatement;
import org.apache.phoenix.parse.SequenceValueParseNode;
@@ -68,6 +70,8 @@ import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.ColumnRef;
import org.apache.phoenix.schema.ConstraintViolationException;
+import org.apache.phoenix.schema.MetaDataClient;
+import org.apache.phoenix.schema.MetaDataEntityNotFoundException;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PColumnImpl;
import org.apache.phoenix.schema.PDataType;
@@ -207,138 +211,182 @@ public class UpsertCompiler {
final PhoenixConnection connection = statement.getConnection();
ConnectionQueryServices services = connection.getQueryServices();
final int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
- final ColumnResolver resolver = FromCompiler.getResolverForMutation(upsert, connection);
- final TableRef tableRef = resolver.getTables().get(0);
- final PTable table = tableRef.getTable();
- if (table.getType() == PTableType.VIEW) {
- if (table.getViewType().isReadOnly()) {
- throw new ReadOnlyTableException(table.getSchemaName().getString(),table.getTableName().getString());
- }
- }
- boolean isSalted = table.getBucketNum() != null;
- final boolean isTenantSpecific = table.isMultiTenant() && connection.getTenantId() != null;
- final boolean isSharedViewIndex = table.getViewIndexId() != null;
- String tenantId = isTenantSpecific ? connection.getTenantId().getString() : null;
- int posOffset = isSalted ? 1 : 0;
- // Setup array of column indexes parallel to values that are going to be set
List<ColumnName> columnNodes = upsert.getColumns();
- final List<PColumn> allColumns = table.getColumns();
+ TableRef tableRefToBe = null;
+ PTable table = null;
Set<PColumn> addViewColumnsToBe = Collections.emptySet();
Set<PColumn> overlapViewColumnsToBe = Collections.emptySet();
-
+ List<PColumn> allColumnsToBe = Collections.emptyList();
+ boolean isTenantSpecific = false;
+ boolean isSharedViewIndex = false;
+ String tenantId = null;
+ ColumnResolver resolver = null;
int[] columnIndexesToBe;
int nColumnsToSet = 0;
int[] pkSlotIndexesToBe;
List<PColumn> targetColumns;
- if (table.getViewType() == ViewType.UPDATABLE) {
- addViewColumnsToBe = Sets.newLinkedHashSetWithExpectedSize(allColumns.size());
- for (PColumn column : allColumns) {
- if (column.getViewConstant() != null) {
- addViewColumnsToBe.add(column);
- }
- }
- }
- ImmutableBytesWritable ptr = new ImmutableBytesWritable();
- // Allow full row upsert if no columns or only dynamic ones are specified and values count match
- if (columnNodes.isEmpty() || columnNodes.size() == upsert.getTable().getDynamicColumns().size()) {
- nColumnsToSet = allColumns.size() - posOffset;
- columnIndexesToBe = new int[nColumnsToSet];
- pkSlotIndexesToBe = new int[columnIndexesToBe.length];
- targetColumns = Lists.newArrayListWithExpectedSize(columnIndexesToBe.length);
- targetColumns.addAll(Collections.<PColumn>nCopies(columnIndexesToBe.length, null));
- int minPKPos = 0;
- if (isTenantSpecific) {
- PColumn tenantColumn = table.getPKColumns().get(minPKPos);
- columnIndexesToBe[minPKPos] = tenantColumn.getPosition();
- targetColumns.set(minPKPos, tenantColumn);
- minPKPos++;
- }
- if (isSharedViewIndex) {
- PColumn indexIdColumn = table.getPKColumns().get(minPKPos);
- columnIndexesToBe[minPKPos] = indexIdColumn.getPosition();
- targetColumns.set(minPKPos, indexIdColumn);
- minPKPos++;
- }
- for (int i = posOffset, j = 0; i < allColumns.size(); i++) {
- PColumn column = allColumns.get(i);
- if (SchemaUtil.isPKColumn(column)) {
- pkSlotIndexesToBe[i-posOffset] = j + posOffset;
- if (j++ < minPKPos) { // Skip, as it's already been set above
- continue;
+ NamedTableNode tableNode = upsert.getTable();
+ String tableName = tableNode.getName().getTableName();
+ String schemaName = tableNode.getName().getSchemaName();
+ // Retry once if columns are explicitly named, as the meta data may
+ // be out of date. We do not retry if auto commit is on, as we
+ // update the cache up front when we create the resolver in that case.
+ boolean retryOnce = !columnNodes.isEmpty() && !connection.getAutoCommit();
+ while (true) {
+ try {
+ /*
+ * Update the cache up front if we don't specify columns. This is going
+ * to cause a slow down, as every statement execution will check if the
+ * meta data is out-of-date. There's no way around this, though, as we
+ * compile in information based on the columns we find when none are
+ * specified. For example, a column could have been removed and a new
+ * one added and we may be assuming the wrong type if we don't do this
+ * now. There's a hole in the existing algorithm too, even if columns
+ * are specified. If one was removed and then added back with the same
+ * name, but a different type, we'll run into problems.
+ */
+ resolver = FromCompiler.getResolverForMutation(upsert, connection, columnNodes.isEmpty());
+ tableRefToBe = resolver.getTables().get(0);
+ table = tableRefToBe.getTable();
+ if (table.getType() == PTableType.VIEW) {
+ if (table.getViewType().isReadOnly()) {
+ throw new ReadOnlyTableException(schemaName,tableName);
}
- minPKPos = 0;
}
- columnIndexesToBe[i-posOffset+minPKPos] = i;
- targetColumns.set(i-posOffset+minPKPos, column);
- }
- if (!addViewColumnsToBe.isEmpty()) {
- // All view columns overlap in this case
- overlapViewColumnsToBe = addViewColumnsToBe;
- addViewColumnsToBe = Collections.emptySet();
- }
- } else {
- // Size for worse case
- int numColsInUpsert = columnNodes.size();
- nColumnsToSet = numColsInUpsert + addViewColumnsToBe.size() + (isTenantSpecific ? 1 : 0) + + (isSharedViewIndex ? 1 : 0);
- columnIndexesToBe = new int[nColumnsToSet];
- pkSlotIndexesToBe = new int[columnIndexesToBe.length];
- targetColumns = Lists.newArrayListWithExpectedSize(columnIndexesToBe.length);
- targetColumns.addAll(Collections.<PColumn>nCopies(columnIndexesToBe.length, null));
- Arrays.fill(columnIndexesToBe, -1); // TODO: necessary? So we'll get an AIOB exception if it's not replaced
- Arrays.fill(pkSlotIndexesToBe, -1); // TODO: necessary? So we'll get an AIOB exception if it's not replaced
- BitSet pkColumnsSet = new BitSet(table.getPKColumns().size());
- int i = 0;
- // Add tenant column directly, as we don't want to resolve it as this will fail
- if (isTenantSpecific) {
- PColumn tenantColumn = table.getPKColumns().get(i + posOffset);
- columnIndexesToBe[i] = tenantColumn.getPosition();
- pkColumnsSet.set(pkSlotIndexesToBe[i] = i + posOffset);
- targetColumns.set(i, tenantColumn);
- i++;
- }
- if (isSharedViewIndex) {
- PColumn indexIdColumn = table.getPKColumns().get(i + posOffset);
- columnIndexesToBe[i] = indexIdColumn.getPosition();
- pkColumnsSet.set(pkSlotIndexesToBe[i] = i + posOffset);
- targetColumns.set(i, indexIdColumn);
- i++;
- }
- for (ColumnName colName : columnNodes) {
- ColumnRef ref = resolver.resolveColumn(null, colName.getFamilyName(), colName.getColumnName());
- PColumn column = ref.getColumn();
- if (IndexUtil.getViewConstantValue(column, ptr)) {
- if (overlapViewColumnsToBe.isEmpty()) {
- overlapViewColumnsToBe = Sets.newHashSetWithExpectedSize(addViewColumnsToBe.size());
+ boolean isSalted = table.getBucketNum() != null;
+ isTenantSpecific = table.isMultiTenant() && connection.getTenantId() != null;
+ isSharedViewIndex = table.getViewIndexId() != null;
+ tenantId = isTenantSpecific ? connection.getTenantId().getString() : null;
+ int posOffset = isSalted ? 1 : 0;
+ // Setup array of column indexes parallel to values that are going to be set
+ allColumnsToBe = table.getColumns();
+
+ nColumnsToSet = 0;
+ if (table.getViewType() == ViewType.UPDATABLE) {
+ addViewColumnsToBe = Sets.newLinkedHashSetWithExpectedSize(allColumnsToBe.size());
+ for (PColumn column : allColumnsToBe) {
+ if (column.getViewConstant() != null) {
+ addViewColumnsToBe.add(column);
+ }
}
- nColumnsToSet--;
- overlapViewColumnsToBe.add(column);
- addViewColumnsToBe.remove(column);
}
- columnIndexesToBe[i] = ref.getColumnPosition();
- targetColumns.set(i, column);
- if (SchemaUtil.isPKColumn(column)) {
- pkColumnsSet.set(pkSlotIndexesToBe[i] = ref.getPKSlotPosition());
- }
- i++;
- }
- for (PColumn column : addViewColumnsToBe) {
- columnIndexesToBe[i] = column.getPosition();
- targetColumns.set(i, column);
- if (SchemaUtil.isPKColumn(column)) {
- pkColumnsSet.set(pkSlotIndexesToBe[i] = SchemaUtil.getPKPosition(table, column));
+ ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+ // Allow full row upsert if no columns or only dynamic ones are specified and values count match
+ if (columnNodes.isEmpty() || columnNodes.size() == upsert.getTable().getDynamicColumns().size()) {
+ nColumnsToSet = allColumnsToBe.size() - posOffset;
+ columnIndexesToBe = new int[nColumnsToSet];
+ pkSlotIndexesToBe = new int[columnIndexesToBe.length];
+ targetColumns = Lists.newArrayListWithExpectedSize(columnIndexesToBe.length);
+ targetColumns.addAll(Collections.<PColumn>nCopies(columnIndexesToBe.length, null));
+ int minPKPos = 0;
+ if (isTenantSpecific) {
+ PColumn tenantColumn = table.getPKColumns().get(minPKPos);
+ columnIndexesToBe[minPKPos] = tenantColumn.getPosition();
+ targetColumns.set(minPKPos, tenantColumn);
+ minPKPos++;
+ }
+ if (isSharedViewIndex) {
+ PColumn indexIdColumn = table.getPKColumns().get(minPKPos);
+ columnIndexesToBe[minPKPos] = indexIdColumn.getPosition();
+ targetColumns.set(minPKPos, indexIdColumn);
+ minPKPos++;
+ }
+ for (int i = posOffset, j = 0; i < allColumnsToBe.size(); i++) {
+ PColumn column = allColumnsToBe.get(i);
+ if (SchemaUtil.isPKColumn(column)) {
+ pkSlotIndexesToBe[i-posOffset] = j + posOffset;
+ if (j++ < minPKPos) { // Skip, as it's already been set above
+ continue;
+ }
+ minPKPos = 0;
+ }
+ columnIndexesToBe[i-posOffset+minPKPos] = i;
+ targetColumns.set(i-posOffset+minPKPos, column);
+ }
+ if (!addViewColumnsToBe.isEmpty()) {
+ // All view columns overlap in this case
+ overlapViewColumnsToBe = addViewColumnsToBe;
+ addViewColumnsToBe = Collections.emptySet();
+ }
+ } else {
+ // Size for worse case
+ int numColsInUpsert = columnNodes.size();
+ nColumnsToSet = numColsInUpsert + addViewColumnsToBe.size() + (isTenantSpecific ? 1 : 0) + + (isSharedViewIndex ? 1 : 0);
+ columnIndexesToBe = new int[nColumnsToSet];
+ pkSlotIndexesToBe = new int[columnIndexesToBe.length];
+ targetColumns = Lists.newArrayListWithExpectedSize(columnIndexesToBe.length);
+ targetColumns.addAll(Collections.<PColumn>nCopies(columnIndexesToBe.length, null));
+ Arrays.fill(columnIndexesToBe, -1); // TODO: necessary? So we'll get an AIOB exception if it's not replaced
+ Arrays.fill(pkSlotIndexesToBe, -1); // TODO: necessary? So we'll get an AIOB exception if it's not replaced
+ BitSet pkColumnsSet = new BitSet(table.getPKColumns().size());
+ int i = 0;
+ // Add tenant column directly, as we don't want to resolve it as this will fail
+ if (isTenantSpecific) {
+ PColumn tenantColumn = table.getPKColumns().get(i + posOffset);
+ columnIndexesToBe[i] = tenantColumn.getPosition();
+ pkColumnsSet.set(pkSlotIndexesToBe[i] = i + posOffset);
+ targetColumns.set(i, tenantColumn);
+ i++;
+ }
+ if (isSharedViewIndex) {
+ PColumn indexIdColumn = table.getPKColumns().get(i + posOffset);
+ columnIndexesToBe[i] = indexIdColumn.getPosition();
+ pkColumnsSet.set(pkSlotIndexesToBe[i] = i + posOffset);
+ targetColumns.set(i, indexIdColumn);
+ i++;
+ }
+ for (ColumnName colName : columnNodes) {
+ ColumnRef ref = resolver.resolveColumn(null, colName.getFamilyName(), colName.getColumnName());
+ PColumn column = ref.getColumn();
+ if (IndexUtil.getViewConstantValue(column, ptr)) {
+ if (overlapViewColumnsToBe.isEmpty()) {
+ overlapViewColumnsToBe = Sets.newHashSetWithExpectedSize(addViewColumnsToBe.size());
+ }
+ nColumnsToSet--;
+ overlapViewColumnsToBe.add(column);
+ addViewColumnsToBe.remove(column);
+ }
+ columnIndexesToBe[i] = ref.getColumnPosition();
+ targetColumns.set(i, column);
+ if (SchemaUtil.isPKColumn(column)) {
+ pkColumnsSet.set(pkSlotIndexesToBe[i] = ref.getPKSlotPosition());
+ }
+ i++;
+ }
+ for (PColumn column : addViewColumnsToBe) {
+ columnIndexesToBe[i] = column.getPosition();
+ targetColumns.set(i, column);
+ if (SchemaUtil.isPKColumn(column)) {
+ pkColumnsSet.set(pkSlotIndexesToBe[i] = SchemaUtil.getPKPosition(table, column));
+ }
+ i++;
+ }
+ for (i = posOffset; i < table.getPKColumns().size(); i++) {
+ PColumn pkCol = table.getPKColumns().get(i);
+ if (!pkColumnsSet.get(i)) {
+ if (!pkCol.isNullable()) {
+ throw new ConstraintViolationException(table.getName().getString() + "." + pkCol.getName().getString() + " may not be null");
+ }
+ }
+ }
}
- i++;
- }
- for (i = posOffset; i < table.getPKColumns().size(); i++) {
- PColumn pkCol = table.getPKColumns().get(i);
- if (!pkColumnsSet.get(i)) {
- if (!pkCol.isNullable()) {
- throw new ConstraintViolationException(table.getName().getString() + "." + pkCol.getName().getString() + " may not be null");
+ break;
+ } catch (MetaDataEntityNotFoundException e) {
+ // Catch column/column family not found exception, as our meta data may
+ // be out of sync. Update the cache once and retry if we were out of sync.
+ // Otherwise throw, as we'll just get the same error next time.
+ if (retryOnce) {
+ retryOnce = false;
+ MetaDataMutationResult result = new MetaDataClient(connection).updateCache(schemaName, tableName);
+ if (result.wasUpdated()) {
+ continue;
}
}
+ throw e;
}
- }
+ }
+ final List<PColumn> allColumns = allColumnsToBe;
List<ParseNode> valueNodes = upsert.getValues();
QueryPlan plan = null;
RowProjector rowProjectorToBe = null;
@@ -355,7 +403,7 @@ public class UpsertCompiler {
select = StatementNormalizer.normalize(select, selectResolver);
select = prependTenantAndViewConstants(table, select, tenantId, addViewColumnsToBe);
sameTable = select.getFrom().size() == 1
- && tableRef.equals(selectResolver.getTables().get(0));
+ && tableRefToBe.equals(selectResolver.getTables().get(0));
/* We can run the upsert in a coprocessor if:
* 1) from has only 1 table and the into table matches from table
* 2) the select query isn't doing aggregation (which requires a client-side final merge)
@@ -374,7 +422,7 @@ public class UpsertCompiler {
} else {
// We can pipeline the upsert select instead of spooling everything to disk first,
// if we don't have any post processing that's required.
- parallelIteratorFactory = upsertParallelIteratorFactoryToBe = new UpsertingParallelIteratorFactory(connection, tableRef);
+ parallelIteratorFactory = upsertParallelIteratorFactoryToBe = new UpsertingParallelIteratorFactory(connection, tableRefToBe);
// If we're in the else, then it's not an aggregate, distinct, limted, or sequence using query,
// so we might be able to run it entirely on the server side.
runOnServer = sameTable && isAutoCommit && !(table.isImmutableRows() && !table.getIndexes().isEmpty());
@@ -392,7 +440,7 @@ public class UpsertCompiler {
// Pass scan through if same table in upsert and select so that projection is computed correctly
// Use optimizer to choose the best plan
plan = new QueryOptimizer(services).optimize(statement, select, selectResolver, targetColumns, parallelIteratorFactory);
- runOnServer &= plan.getTableRef().equals(tableRef);
+ runOnServer &= plan.getTableRef().equals(tableRefToBe);
rowProjectorToBe = plan.getProjector();
nValuesToSet = rowProjectorToBe.getColumnCount();
// Cannot auto commit if doing aggregation or topN or salted
@@ -416,6 +464,7 @@ public class UpsertCompiler {
.build().buildException();
}
+ final TableRef tableRef = tableRefToBe;
final int[] columnIndexes = columnIndexesToBe;
final int[] pkSlotIndexes = pkSlotIndexesToBe;
final Set<PColumn> addViewColumns = addViewColumnsToBe;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/42465628/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 911e3ea..532255f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -262,8 +262,9 @@ public class MutationState implements SQLCloseable {
}
/**
- * Validates that the meta data is still valid based on the current server time
- * and returns the server time to use for the upsert for each table.
+ * Validates that the meta data is valid against the server meta data if we haven't yet done so.
+ * Otherwise, for every UPSERT VALUES call, we'd need to hit the server to see if the meta data
+ * has changed.
* @param connection
* @return the server time to use for the upsert
* @throws SQLException if the table or any columns no longer exist
@@ -278,6 +279,8 @@ public class MutationState implements SQLCloseable {
TableRef tableRef = entry.getKey();
long serverTimeStamp = tableRef.getTimeStamp();
PTable table = tableRef.getTable();
+ // If we're auto committing, we've already validated the schema when we got the ColumnResolver,
+ // so no need to do it again here.
if (!connection.getAutoCommit()) {
MetaDataMutationResult result = client.updateCache(table.getSchemaName().getString(), table.getTableName().getString());
long timestamp = result.getMutationTime();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/42465628/phoenix-core/src/main/java/org/apache/phoenix/parse/DMLStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/DMLStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/DMLStatement.java
new file mode 100644
index 0000000..46bd907
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/DMLStatement.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+public class DMLStatement extends SingleTableStatement {
+
+ public DMLStatement(NamedTableNode table, int bindCount) {
+ super(table, bindCount);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/42465628/phoenix-core/src/main/java/org/apache/phoenix/parse/DeleteStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/DeleteStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/DeleteStatement.java
index b15b325..d295e85 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/DeleteStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/DeleteStatement.java
@@ -22,7 +22,7 @@ import java.util.List;
import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
-public class DeleteStatement extends SingleTableStatement implements FilterableStatement {
+public class DeleteStatement extends DMLStatement implements FilterableStatement {
private final ParseNode whereNode;
private final List<OrderByNode> orderBy;
private final LimitNode limit;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/42465628/phoenix-core/src/main/java/org/apache/phoenix/parse/UpsertStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/UpsertStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/UpsertStatement.java
index bb23421..fc299d1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/UpsertStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/UpsertStatement.java
@@ -20,7 +20,7 @@ package org.apache.phoenix.parse;
import java.util.Collections;
import java.util.List;
-public class UpsertStatement extends SingleTableStatement {
+public class UpsertStatement extends DMLStatement {
private final List<ColumnName> columns;
private final List<ParseNode> values;
private final SelectStatement select;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/42465628/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 64cf70c..a462a5d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -574,7 +574,7 @@ public class MetaDataClient {
boolean allocateViewIndexId = false;
while (true) {
try {
- ColumnResolver resolver = FromCompiler.getResolverForMutation(statement, connection);
+ ColumnResolver resolver = FromCompiler.getResolver(statement, connection);
tableRef = resolver.getTables().get(0);
PTable dataTable = tableRef.getTable();
boolean isTenantConnection = connection.getTenantId() != null;
@@ -1508,7 +1508,7 @@ public class MetaDataClient {
boolean retried = false;
while (true) {
List<Mutation> tableMetaData = Lists.newArrayListWithExpectedSize(2);
- ColumnResolver resolver = FromCompiler.getResolverForMutation(statement, connection);
+ ColumnResolver resolver = FromCompiler.getResolver(statement, connection);
PTable table = resolver.getTables().get(0).getTable();
if (logger.isDebugEnabled()) {
logger.debug("Resolved table to " + table.getName().getString() + " with seqNum " + table.getSequenceNumber() + " at timestamp " + table.getTimeStamp() + " with " + table.getColumns().size() + " columns: " + table.getColumns());
@@ -1822,7 +1822,7 @@ public class MetaDataClient {
String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
boolean retried = false;
while (true) {
- final ColumnResolver resolver = FromCompiler.getResolverForMutation(statement, connection);
+ final ColumnResolver resolver = FromCompiler.getResolver(statement, connection);
PTable table = resolver.getTables().get(0).getTable();
List<ColumnName> columnRefs = statement.getColumnRefs();
if(columnRefs == null) {
@@ -1996,7 +1996,7 @@ public class MetaDataClient {
}
connection.setAutoCommit(false);
// Confirm index table is valid and up-to-date
- TableRef indexRef = FromCompiler.getResolverForMutation(statement, connection).getTables().get(0);
+ TableRef indexRef = FromCompiler.getResolver(statement, connection).getTables().get(0);
PreparedStatement tableUpsert = null;
try {
if(newIndexState == PIndexState.ACTIVE){
[3/3] git commit: PHOENIX-1181 client cache fails to update itself
after a table was altered from a diff client
Posted by ja...@apache.org.
PHOENIX-1181 client cache fails to update itself after a table was altered from a diff client
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/5ca432b2
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/5ca432b2
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/5ca432b2
Branch: refs/heads/3.0
Commit: 5ca432b2dd4b51c8314a5659a10e33eb4b7693bd
Parents: 23fc482
Author: James Taylor <jt...@salesforce.com>
Authored: Tue Aug 19 23:34:31 2014 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Tue Aug 19 23:34:31 2014 -0700
----------------------------------------------------------------------
.../apache/phoenix/compile/FromCompiler.java | 9 +-
.../apache/phoenix/compile/UpsertCompiler.java | 201 ++++++++++---------
2 files changed, 103 insertions(+), 107 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5ca432b2/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
index 02d4f99..efc0973 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
@@ -177,19 +177,14 @@ public class FromCompiler {
public static ColumnResolver getResolverForMutation(DMLStatement statement, PhoenixConnection connection)
throws SQLException {
- return getResolverForMutation(statement, connection, false);
- }
-
- public static ColumnResolver getResolverForMutation(DMLStatement statement, PhoenixConnection connection, boolean updateCacheImmediately)
- throws SQLException {
/*
* We validate the meta data at commit time for mutations, as this allows us to do many UPSERT VALUES calls
* without hitting the server each time to check if the meta data is up-to-date.
*/
- SingleTableColumnResolver visitor = new SingleTableColumnResolver(connection, statement.getTable(), updateCacheImmediately);
+ SingleTableColumnResolver visitor = new SingleTableColumnResolver(connection, statement.getTable(), false);
return visitor;
}
-
+
private static class SingleTableColumnResolver extends BaseColumnResolver {
private final List<TableRef> tableRefs;
private final String alias;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5ca432b2/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index d625a9d..b8e1a6d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -37,7 +37,6 @@ import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
-import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
@@ -48,7 +47,6 @@ import org.apache.phoenix.expression.LiteralExpression;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.index.IndexMetaDataCacheClient;
import org.apache.phoenix.index.PhoenixIndexCodec;
-import org.apache.phoenix.iterate.ParallelIterators.ParallelIteratorFactory;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixResultSet;
@@ -224,28 +222,23 @@ public class UpsertCompiler {
int[] columnIndexesToBe;
int nColumnsToSet = 0;
int[] pkSlotIndexesToBe;
+ List<ParseNode> valueNodes = upsert.getValues();
List<PColumn> targetColumns;
NamedTableNode tableNode = upsert.getTable();
String tableName = tableNode.getName().getTableName();
String schemaName = tableNode.getName().getSchemaName();
- // Retry once if columns are explicitly named, as the meta data may
+ QueryPlan queryPlanToBe = null;
+ int nValuesToSet;
+ boolean sameTable = false;
+ boolean runOnServer = false;
+ UpsertingParallelIteratorFactory parallelIteratorFactoryToBe = null;
+ // Retry once if auto commit is off, as the meta data may
// be out of date. We do not retry if auto commit is on, as we
// update the cache up front when we create the resolver in that case.
- boolean retryOnce = !columnNodes.isEmpty() && !connection.getAutoCommit();
+ boolean retryOnce = !connection.getAutoCommit();
while (true) {
try {
- /*
- * Update the cache up front if we don't specify columns. This is going
- * to cause a slow down, as every statement execution will check if the
- * meta data is out-of-date. There's no way around this, though, as we
- * compile in information based on the columns we find when none are
- * specified. For example, a column could have been removed and a new
- * one added and we may be assuming the wrong type if we don't do this
- * now. There's a hole in the existing algorithm too, even if columns
- * are specified. If one was removed and then added back with the same
- * name, but a different type, we'll run into problems.
- */
- resolver = FromCompiler.getResolverForMutation(upsert, connection, columnNodes.isEmpty());
+ resolver = FromCompiler.getResolverForMutation(upsert, connection);
tableRefToBe = resolver.getTables().get(0);
table = tableRefToBe.getTable();
if (table.getType() == PTableType.VIEW) {
@@ -370,105 +363,113 @@ public class UpsertCompiler {
}
}
}
- break;
+ boolean isAutoCommit = connection.getAutoCommit();
+ if (valueNodes == null) {
+ SelectStatement select = upsert.getSelect();
+ assert(select != null);
+ select = SubselectRewriter.flatten(select, connection);
+ ColumnResolver selectResolver = FromCompiler.getResolverForQuery(select, connection);
+ select = StatementNormalizer.normalize(select, selectResolver);
+ select = prependTenantAndViewConstants(table, select, tenantId, addViewColumnsToBe);
+ sameTable = select.getFrom().size() == 1
+ && tableRefToBe.equals(selectResolver.getTables().get(0));
+ /* We can run the upsert in a coprocessor if:
+ * 1) from has only 1 table and the into table matches from table
+ * 2) the select query isn't doing aggregation (which requires a client-side final merge)
+ * 3) autoCommit is on
+ * 4) the table is not immutable with indexes, as the client is the one that figures out the additional
+ * puts for index tables.
+ * 5) no limit clause, as the limit clause requires client-side post processing
+ * 6) no sequences, as sequences imply that the order of upsert must match the order of
+ * selection.
+ * Otherwise, run the query to pull the data from the server
+ * and populate the MutationState (upto a limit).
+ */
+ if (! (select.isAggregate() || select.isDistinct() || select.getLimit() != null || select.hasSequence()) ) {
+ // We can pipeline the upsert select instead of spooling everything to disk first,
+ // if we don't have any post processing that's required.
+ parallelIteratorFactoryToBe = new UpsertingParallelIteratorFactory(connection, tableRefToBe);
+ // If we're in the else, then it's not an aggregate, distinct, limted, or sequence using query,
+ // so we might be able to run it entirely on the server side.
+ runOnServer = sameTable && isAutoCommit && !(table.isImmutableRows() && !table.getIndexes().isEmpty());
+ }
+ // If we may be able to run on the server, add a hint that favors using the data table
+ // if all else is equal.
+ // TODO: it'd be nice if we could figure out in advance if the PK is potentially changing,
+ // as this would disallow running on the server. We currently use the row projector we
+ // get back to figure this out.
+ HintNode hint = upsert.getHint();
+ if (!upsert.getHint().hasHint(Hint.USE_INDEX_OVER_DATA_TABLE)) {
+ hint = HintNode.create(hint, Hint.USE_DATA_OVER_INDEX_TABLE);
+ }
+ select = SelectStatement.create(select, hint);
+ // Pass scan through if same table in upsert and select so that projection is computed correctly
+ // Use optimizer to choose the best plan
+ try {
+ QueryCompiler compiler = new QueryCompiler(statement, select, selectResolver, targetColumns, parallelIteratorFactoryToBe, new SequenceManager(statement));
+ queryPlanToBe = compiler.compile();
+ } catch (MetaDataEntityNotFoundException e) {
+ retryOnce = false; // don't retry if select clause has meta data entities that aren't found, as we already updated the cache
+ throw e;
+ }
+ nValuesToSet = queryPlanToBe.getProjector().getColumnCount();
+ // Cannot auto commit if doing aggregation or topN or salted
+ // Salted causes problems because the row may end up living on a different region
+ } else {
+ nValuesToSet = valueNodes.size() + addViewColumnsToBe.size() + (isTenantSpecific ? 1 : 0) + (isSharedViewIndex ? 1 : 0);
+ }
+ // Resize down to allow a subset of columns to be specifiable
+ if (columnNodes.isEmpty() && columnIndexesToBe.length >= nValuesToSet) {
+ nColumnsToSet = nValuesToSet;
+ columnIndexesToBe = Arrays.copyOf(columnIndexesToBe, nValuesToSet);
+ pkSlotIndexesToBe = Arrays.copyOf(pkSlotIndexesToBe, nValuesToSet);
+ }
+
+ if (nValuesToSet != nColumnsToSet) {
+ // We might have added columns, so refresh cache and try again if stale.
+ // Note that this check is not really sufficient, as a column could have
+ // been removed and the added back and we wouldn't detect that here.
+ if (retryOnce) {
+ retryOnce = false;
+ if (new MetaDataClient(connection).updateCache(schemaName, tableName).wasUpdated()) {
+ continue;
+ }
+ }
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.UPSERT_COLUMN_NUMBERS_MISMATCH)
+ .setMessage("Numbers of columns: " + nColumnsToSet + ". Number of values: " + nValuesToSet)
+ .build().buildException();
+ }
} catch (MetaDataEntityNotFoundException e) {
// Catch column/column family not found exception, as our meta data may
// be out of sync. Update the cache once and retry if we were out of sync.
// Otherwise throw, as we'll just get the same error next time.
if (retryOnce) {
retryOnce = false;
- MetaDataMutationResult result = new MetaDataClient(connection).updateCache(schemaName, tableName);
- if (result.wasUpdated()) {
+ if (new MetaDataClient(connection).updateCache(schemaName, tableName).wasUpdated()) {
continue;
}
}
throw e;
}
- }
-
- final List<PColumn> allColumns = allColumnsToBe;
- List<ParseNode> valueNodes = upsert.getValues();
- QueryPlan plan = null;
- RowProjector rowProjectorToBe = null;
- final int nValuesToSet;
- boolean sameTable = false;
- boolean runOnServer = false;
- UpsertingParallelIteratorFactory upsertParallelIteratorFactoryToBe = null;
- final boolean isAutoCommit = connection.getAutoCommit();
- if (valueNodes == null) {
- SelectStatement select = upsert.getSelect();
- assert(select != null);
- select = SubselectRewriter.flatten(select, connection);
- ColumnResolver selectResolver = FromCompiler.getResolverForQuery(select, connection);
- select = StatementNormalizer.normalize(select, selectResolver);
- select = prependTenantAndViewConstants(table, select, tenantId, addViewColumnsToBe);
- sameTable = select.getFrom().size() == 1
- && tableRefToBe.equals(selectResolver.getTables().get(0));
- /* We can run the upsert in a coprocessor if:
- * 1) from has only 1 table and the into table matches from table
- * 2) the select query isn't doing aggregation (which requires a client-side final merge)
- * 3) autoCommit is on
- * 4) the table is not immutable with indexes, as the client is the one that figures out the additional
- * puts for index tables.
- * 5) no limit clause, as the limit clause requires client-side post processing
- * 6) no sequences, as sequences imply that the order of upsert must match the order of
- * selection.
- * Otherwise, run the query to pull the data from the server
- * and populate the MutationState (upto a limit).
- */
- ParallelIteratorFactory parallelIteratorFactory;
- if (select.isAggregate() || select.isDistinct() || select.getLimit() != null || select.hasSequence()) {
- parallelIteratorFactory = null;
- } else {
- // We can pipeline the upsert select instead of spooling everything to disk first,
- // if we don't have any post processing that's required.
- parallelIteratorFactory = upsertParallelIteratorFactoryToBe = new UpsertingParallelIteratorFactory(connection, tableRefToBe);
- // If we're in the else, then it's not an aggregate, distinct, limted, or sequence using query,
- // so we might be able to run it entirely on the server side.
- runOnServer = sameTable && isAutoCommit && !(table.isImmutableRows() && !table.getIndexes().isEmpty());
- }
- // If we may be able to run on the server, add a hint that favors using the data table
- // if all else is equal.
- // TODO: it'd be nice if we could figure out in advance if the PK is potentially changing,
- // as this would disallow running on the server. We currently use the row projector we
- // get back to figure this out.
- HintNode hint = upsert.getHint();
- if (!upsert.getHint().hasHint(Hint.USE_INDEX_OVER_DATA_TABLE)) {
- hint = HintNode.create(hint, Hint.USE_DATA_OVER_INDEX_TABLE);
- }
- select = SelectStatement.create(select, hint);
- // Pass scan through if same table in upsert and select so that projection is computed correctly
- // Use optimizer to choose the best plan
- plan = new QueryOptimizer(services).optimize(statement, select, selectResolver, targetColumns, parallelIteratorFactory);
- runOnServer &= plan.getTableRef().equals(tableRefToBe);
- rowProjectorToBe = plan.getProjector();
- nValuesToSet = rowProjectorToBe.getColumnCount();
- // Cannot auto commit if doing aggregation or topN or salted
- // Salted causes problems because the row may end up living on a different region
- } else {
- nValuesToSet = valueNodes.size() + addViewColumnsToBe.size() + (isTenantSpecific ? 1 : 0) + (isSharedViewIndex ? 1 : 0);
- }
- final RowProjector projector = rowProjectorToBe;
- final UpsertingParallelIteratorFactory upsertParallelIteratorFactory = upsertParallelIteratorFactoryToBe;
- final QueryPlan queryPlan = plan;
- // Resize down to allow a subset of columns to be specifiable
- if (columnNodes.isEmpty() && columnIndexesToBe.length >= nValuesToSet) {
- nColumnsToSet = nValuesToSet;
- columnIndexesToBe = Arrays.copyOf(columnIndexesToBe, nValuesToSet);
- pkSlotIndexesToBe = Arrays.copyOf(pkSlotIndexesToBe, nValuesToSet);
+ break;
}
- if (nValuesToSet != nColumnsToSet) {
- throw new SQLExceptionInfo.Builder(SQLExceptionCode.UPSERT_COLUMN_NUMBERS_MISMATCH)
- .setMessage("Numbers of columns: " + nColumnsToSet + ". Number of values: " + nValuesToSet)
- .build().buildException();
+ RowProjector projectorToBe = null;
+ // Optimize only after all checks have been performed
+ if (valueNodes == null) {
+ queryPlanToBe = new QueryOptimizer(services).optimize(queryPlanToBe, statement, targetColumns, parallelIteratorFactoryToBe);
+ projectorToBe = queryPlanToBe.getProjector();
+ runOnServer &= queryPlanToBe.getTableRef().equals(tableRefToBe);
}
-
+ final List<PColumn> allColumns = allColumnsToBe;
+ final RowProjector projector = projectorToBe;
+ final QueryPlan queryPlan = queryPlanToBe;
final TableRef tableRef = tableRefToBe;
final int[] columnIndexes = columnIndexesToBe;
final int[] pkSlotIndexes = pkSlotIndexesToBe;
final Set<PColumn> addViewColumns = addViewColumnsToBe;
final Set<PColumn> overlapViewColumns = overlapViewColumnsToBe;
+ final UpsertingParallelIteratorFactory parallelIteratorFactory = parallelIteratorFactoryToBe;
// TODO: break this up into multiple functions
////////////////////////////////////////////////////////////////////
@@ -648,12 +649,12 @@ public class UpsertCompiler {
@Override
public MutationState execute() throws SQLException {
ResultIterator iterator = queryPlan.iterator();
- if (upsertParallelIteratorFactory == null) {
+ if (parallelIteratorFactory == null) {
return upsertSelect(statement, tableRef, projector, iterator, columnIndexes, pkSlotIndexes);
}
- upsertParallelIteratorFactory.setRowProjector(projector);
- upsertParallelIteratorFactory.setColumnIndexes(columnIndexes);
- upsertParallelIteratorFactory.setPkSlotIndexes(pkSlotIndexes);
+ parallelIteratorFactory.setRowProjector(projector);
+ parallelIteratorFactory.setColumnIndexes(columnIndexes);
+ parallelIteratorFactory.setPkSlotIndexes(pkSlotIndexes);
Tuple tuple;
long totalRowCount = 0;
while ((tuple=iterator.next()) != null) {// Runs query
[2/3] git commit: Merge branch '3.0' of
https://git-wip-us.apache.org/repos/asf/phoenix into 3.0
Posted by ja...@apache.org.
Merge branch '3.0' of https://git-wip-us.apache.org/repos/asf/phoenix into 3.0
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/23fc4825
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/23fc4825
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/23fc4825
Branch: refs/heads/3.0
Commit: 23fc4825083479d190fb6d0a9e3daee9a05d5238
Parents: 4246562 91e5d3a
Author: James Taylor <jt...@salesforce.com>
Authored: Tue Aug 19 20:51:46 2014 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Tue Aug 19 20:51:46 2014 -0700
----------------------------------------------------------------------
CHANGES | 9 +-
dev/make_rc.sh | 2 +-
.../apache/phoenix/end2end/DerivedTableIT.java | 13 ++
.../org/apache/phoenix/end2end/HashJoinIT.java | 49 ++++
.../MutatingParallelIteratorFactory.java | 3 +-
.../apache/phoenix/compile/QueryCompiler.java | 13 +-
.../phoenix/compile/StatementContext.java | 23 --
.../apache/phoenix/execute/AggregatePlan.java | 9 +-
.../phoenix/iterate/ChunkedResultIterator.java | 27 +--
.../phoenix/iterate/ParallelIterators.java | 7 +-
.../phoenix/iterate/SpoolingResultIterator.java | 3 +-
.../RoundFloorCeilExpressionsTest.java | 230 +++++++++++++++++++
.../RoundFloorCeilExpressionsUnitTests.java | 230 -------------------
.../phoenix/query/KeyRangeCoalesceTest.java | 161 +++++++++++++
.../phoenix/query/KeyRangeCoalesceTests.java | 161 -------------
.../phoenix/query/KeyRangeIntersectTest.java | 97 ++++++++
.../phoenix/query/KeyRangeIntersectTests.java | 97 --------
.../apache/phoenix/query/KeyRangeUnionTest.java | 97 ++++++++
.../phoenix/query/KeyRangeUnionTests.java | 97 --------
19 files changed, 689 insertions(+), 639 deletions(-)
----------------------------------------------------------------------