You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2015/12/09 04:42:15 UTC
phoenix git commit: PHOENIX-2383 Implement Sequence in
Phoenix/Calcite integration
Repository: phoenix
Updated Branches:
refs/heads/calcite 9a3e5e51b -> 57c3f3dba
PHOENIX-2383 Implement Sequence in Phoenix/Calcite integration
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/57c3f3db
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/57c3f3db
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/57c3f3db
Branch: refs/heads/calcite
Commit: 57c3f3dba5fff6f852ca66a135dc768a560acf78
Parents: 9a3e5e5
Author: maryannxue <we...@intel.com>
Authored: Tue Dec 8 22:42:01 2015 -0500
Committer: maryannxue <we...@intel.com>
Committed: Tue Dec 8 22:42:01 2015 -0500
----------------------------------------------------------------------
.../org/apache/phoenix/calcite/CalciteIT.java | 44 +++++++++++-
.../apache/phoenix/calcite/CalciteUtils.java | 73 ++++++++++++++++++++
.../apache/phoenix/calcite/PhoenixSchema.java | 62 ++++++++++++++++-
.../apache/phoenix/calcite/PhoenixSequence.java | 42 +++++++++++
.../calcite/rel/PhoenixClientProject.java | 34 ++++++++-
.../phoenix/calcite/rel/PhoenixFilter.java | 8 +++
.../apache/phoenix/calcite/rel/PhoenixRel.java | 6 ++
.../calcite/rel/PhoenixRelImplementorImpl.java | 22 ++++++
.../calcite/rules/PhoenixConverterRules.java | 47 +++++--------
.../apache/phoenix/compile/SequenceManager.java | 20 +++---
10 files changed, 316 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/57c3f3db/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java
index bac09bd..dd35b23 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java
@@ -21,6 +21,7 @@ import com.google.common.collect.Lists;
import org.apache.calcite.config.CalciteConnectionProperty;
import org.apache.phoenix.end2end.BaseClientManagedTimeIT;
+import org.apache.phoenix.schema.SequenceAlreadyExistsException;
import org.apache.phoenix.schema.TableAlreadyExistsException;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
@@ -261,7 +262,8 @@ public class CalciteIT extends BaseClientManagedTimeIT {
"CREATE INDEX IDX1 ON aTable (a_string) INCLUDE (b_string, x_integer)",
"CREATE INDEX IDX2 ON aTable (b_string) INCLUDE (a_string, y_integer)",
"CREATE INDEX IDX_FULL ON aTable (b_string) INCLUDE (a_string, a_integer, a_date, a_time, a_timestamp, x_decimal, x_long, x_integer, y_integer, a_byte, a_short, a_float, a_double, a_unsigned_float, a_unsigned_double)",
- "CREATE VIEW v AS SELECT * from aTable where a_string = 'a'");
+ "CREATE VIEW v AS SELECT * from aTable where a_string = 'a'",
+ "CREATE SEQUENCE seq START WITH 1 INCREMENT BY 1");
final Connection connection = DriverManager.getConnection(url);
connection.createStatement().execute("UPDATE STATISTICS ATABLE");
connection.createStatement().execute("UPDATE STATISTICS " + JOIN_CUSTOMER_TABLE_FULL_NAME);
@@ -284,6 +286,7 @@ public class CalciteIT extends BaseClientManagedTimeIT {
try {
conn.createStatement().execute(ddl);
} catch (TableAlreadyExistsException e) {
+ } catch (SequenceAlreadyExistsException e) {
}
}
conn.close();
@@ -1697,6 +1700,45 @@ public class CalciteIT extends BaseClientManagedTimeIT {
{"5", 6}})
.close();
}
+
+ @Test public void testSequence() throws Exception {
+ start(false).sql("select NEXT VALUE FOR seq, c0 from (values (1), (1)) as t(c0)")
+ .explainIs("PhoenixToEnumerableConverter\n" +
+ " PhoenixClientProject(EXPR$0=[NEXT_VALUE('\"SEQ\"')], C0=[$0])\n" +
+ " PhoenixValues(tuples=[[{ 1 }, { 1 }]])\n")
+ .resultIs(new Object[][]{
+ {1L, 1},
+ {2L, 1}})
+ .close();
+
+ start(false).sql("select NEXT VALUE FOR seq, entity_id from aTable where a_string = 'a'")
+ .explainIs("PhoenixToEnumerableConverter\n" +
+ " PhoenixClientProject(EXPR$0=[NEXT_VALUE('\"SEQ\"')], ENTITY_ID=[$1])\n" +
+ " PhoenixTableScan(table=[[phoenix, ATABLE]], filter=[=($2, 'a')])\n")
+ .resultIs(new Object[][]{
+ {3L, "00A123122312312"},
+ {4L, "00A223122312312"},
+ {5L, "00A323122312312"},
+ {6L, "00A423122312312"}})
+ .close();
+
+ start(false).sql("SELECT NEXT VALUE FOR seq, item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + JOIN_ITEM_TABLE_FULL_NAME + " item JOIN " + JOIN_SUPPLIER_TABLE_FULL_NAME + " supp ON item.\"supplier_id\" = supp.\"supplier_id\"")
+ .explainIs("PhoenixToEnumerableConverter\n" +
+ " PhoenixClientProject(EXPR$0=[NEXT_VALUE('\"SEQ\"')], item_id=[$0], NAME=[$1], supplier_id=[$3], NAME0=[$4])\n" +
+ " PhoenixServerJoin(condition=[=($2, $3)], joinType=[inner])\n" +
+ " PhoenixServerProject(item_id=[$0], NAME=[$1], supplier_id=[$5])\n" +
+ " PhoenixTableScan(table=[[phoenix, Join, ItemTable]])\n" +
+ " PhoenixServerProject(supplier_id=[$0], NAME=[$1])\n" +
+ " PhoenixTableScan(table=[[phoenix, Join, SupplierTable]])\n")
+ .resultIs(new Object[][] {
+ {7L, "0000000001", "T1", "0000000001", "S1"},
+ {8L, "0000000002", "T2", "0000000001", "S1"},
+ {9L, "0000000003", "T3", "0000000002", "S2"},
+ {10L, "0000000004", "T4", "0000000002", "S2"},
+ {11L, "0000000005", "T5", "0000000005", "S5"},
+ {12L, "0000000006", "T6", "0000000006", "S6"}})
+ .close();
+ }
/** Tests a simple command that is defined in Phoenix's extended SQL parser.
* @throws Exception */
http://git-wip-us.apache.org/repos/asf/phoenix/blob/57c3f3db/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java
index 3c1fdaa..15df943 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java
@@ -7,18 +7,22 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.calcite.avatica.util.ByteString;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.prepare.Prepare;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.calcite.rel.RelFieldCollation.Direction;
import org.apache.calcite.rel.RelFieldCollation.NullDirection;
import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexCorrelVariable;
import org.apache.calcite.rex.RexFieldAccess;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexVisitorImpl;
import org.apache.calcite.sql.SemiJoinType;
import org.apache.calcite.sql.SqlAggFunction;
import org.apache.calcite.sql.SqlFunction;
@@ -27,6 +31,7 @@ import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.NlsString;
+import org.apache.calcite.util.Util;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.phoenix.calcite.rel.PhoenixRel.Implementor;
@@ -82,6 +87,7 @@ import org.apache.phoenix.expression.function.SumAggregateFunction;
import org.apache.phoenix.expression.function.TrimFunction;
import org.apache.phoenix.expression.function.UpperFunction;
import org.apache.phoenix.parse.JoinTableNode.JoinType;
+import org.apache.phoenix.parse.SequenceValueParseNode;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.TypeMismatchException;
@@ -775,6 +781,28 @@ public class CalciteUtils {
}
}
});
+ EXPRESSION_MAP.put(SqlKind.CURRENT_VALUE, new ExpressionFactory() {
+ @Override
+ public Expression newExpression(RexNode node, Implementor implementor) {
+ RexCall call = (RexCall) node;
+ RexLiteral operand = (RexLiteral) call.getOperands().get(0);
+ List<String> name = Util.stringToList((String) operand.getValue2());
+ RelOptTable table = Prepare.CatalogReader.THREAD_LOCAL.get().getTable(name);
+ PhoenixSequence seq = table.unwrap(PhoenixSequence.class);
+ return implementor.newSequenceExpression(seq, SequenceValueParseNode.Op.CURRENT_VALUE);
+ }
+ });
+ EXPRESSION_MAP.put(SqlKind.NEXT_VALUE, new ExpressionFactory() {
+ @Override
+ public Expression newExpression(RexNode node, Implementor implementor) {
+ RexCall call = (RexCall) node;
+ RexLiteral operand = (RexLiteral) call.getOperands().get(0);
+ List<String> name = Util.stringToList((String) operand.getValue2());
+ RelOptTable table = Prepare.CatalogReader.THREAD_LOCAL.get().getTable(name);
+ PhoenixSequence seq = table.unwrap(PhoenixSequence.class);
+ return implementor.newSequenceExpression(seq, SequenceValueParseNode.Op.NEXT_VALUE);
+ }
+ });
// TODO: SqlKind.CASE
}
@@ -916,4 +944,49 @@ public class CalciteUtils {
public static interface FunctionFactory {
public FunctionExpression newFunction(SqlFunction sqlFunc, List<Expression> args);
}
+
+ public static boolean hasSequenceValueCall(Project project) {
+ SequenceValueFinder seqFinder = new SequenceValueFinder();
+ for (RexNode node : project.getProjects()) {
+ node.accept(seqFinder);
+ if (seqFinder.sequenceValueCall != null) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ public static PhoenixSequence findSequence(Project project) {
+ SequenceValueFinder seqFinder = new SequenceValueFinder();
+ for (RexNode node : project.getProjects()) {
+ node.accept(seqFinder);
+ if (seqFinder.sequenceValueCall != null) {
+ RexLiteral operand =
+ (RexLiteral) seqFinder.sequenceValueCall.getOperands().get(0);
+ List<String> name = Util.stringToList((String) operand.getValue2());
+ RelOptTable table = Prepare.CatalogReader.THREAD_LOCAL.get().getTable(name);
+ return table.unwrap(PhoenixSequence.class);
+ }
+ }
+
+ return null;
+ }
+
+ private static class SequenceValueFinder extends RexVisitorImpl<Void> {
+ private RexCall sequenceValueCall;
+
+ private SequenceValueFinder() {
+ super(true);
+ }
+
+ public Void visitCall(RexCall call) {
+ if (sequenceValueCall == null
+ && (call.getKind() == SqlKind.CURRENT_VALUE
+ || call.getKind() == SqlKind.NEXT_VALUE)) {
+ sequenceValueCall = call;
+ }
+ return null;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/57c3f3db/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java
index 0c45a25..6ef29e1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java
@@ -10,6 +10,12 @@ import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.materialize.MaterializationService;
import org.apache.calcite.schema.*;
import org.apache.calcite.schema.impl.ViewTable;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.compile.ColumnResolver;
import org.apache.phoenix.compile.FromCompiler;
import org.apache.phoenix.jdbc.PhoenixConnection;
@@ -17,6 +23,7 @@ import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.parse.ColumnDef;
import org.apache.phoenix.parse.NamedTableNode;
import org.apache.phoenix.parse.TableName;
+import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.MetaDataClient;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PTable;
@@ -28,6 +35,7 @@ import org.apache.phoenix.schema.TableNotFoundException;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.util.IndexUtil;
+import java.io.IOException;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
@@ -62,6 +70,7 @@ public class PhoenixSchema implements Schema {
protected final Map<String, PTable> tableMap;
protected final Map<String, ViewDef> viewDefMap;
protected final Map<String, Function> functionMap;
+ protected final Map<String, PhoenixSequence> sequenceMap;
private PhoenixSchema(String name, String schemaName, PhoenixConnection pc) {
this.name = name;
@@ -71,7 +80,9 @@ public class PhoenixSchema implements Schema {
this.tableMap = Maps.<String, PTable> newHashMap();
this.viewDefMap = Maps.<String, ViewDef> newHashMap();
this.functionMap = Maps.<String, Function> newHashMap();
+ this.sequenceMap = Maps.<String, PhoenixSequence> newHashMap();
loadTables();
+ loadSequences();
this.subSchemaNames = schemaName == null ?
ImmutableSet.<String> copyOf(loadSubSchemaNames())
: Collections.<String> emptySet();
@@ -159,6 +170,49 @@ public class PhoenixSchema implements Schema {
table.isWALDisabled(), false, table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(),
table.getTableStats(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable());
}
+
+ private void loadSequences() {
+ try {
+ int nSeperators = 1; //pc.getQueryServices().getSequenceSaltBuckets() <= 0 ? 1 : 2;
+ HTableInterface hTable = pc.getQueryServices().getTable(PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_BYTES);
+ Scan scan = new Scan();
+ scan.setFilter(new KeyOnlyFilter());
+ ResultScanner scanner = hTable.getScanner(scan);
+ Result next = scanner.next();
+ while (next != null) {
+ byte[] key = next.getRow();
+ int nSkipped = 0;
+ while (nSkipped < nSeperators) {
+ int index = Bytes.indexOf(key, QueryConstants.SEPARATOR_BYTE_ARRAY);
+ if (index >= 0) {
+ nSkipped++;
+ int offset = index + QueryConstants.SEPARATOR_BYTE_ARRAY.length;
+ key = Bytes.copy(key, offset, key.length - offset);
+ } else {
+ break;
+ }
+ }
+ if (nSkipped != nSeperators) {
+ throw new RuntimeException("Unrecognized sequence key: '" + key + "'");
+ }
+ int index = Bytes.indexOf(key, QueryConstants.SEPARATOR_BYTE_ARRAY);
+ if (index < 0) {
+ throw new RuntimeException("Unrecognized sequence key: '" + key + "'");
+ }
+ if ((schemaName == null && index == 0)
+ || (schemaName != null && schemaName.equals(Bytes.toString(key, 0, index)))) {
+ int offset = index + QueryConstants.SEPARATOR_BYTE_ARRAY.length;
+ String sequenceName = Bytes.toString(key, offset, key.length - offset);
+ sequenceMap.put(sequenceName, new PhoenixSequence(schemaName, sequenceName, pc));
+ }
+ next = scanner.next();
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
private static Schema create(String name, Map<String, Object> operand) {
String url = (String) operand.get("url");
@@ -183,12 +237,16 @@ public class PhoenixSchema implements Schema {
@Override
public Table getTable(String name) {
PTable table = tableMap.get(name);
- return table == null ? null : new PhoenixTable(pc, table);
+ if (table != null) {
+ return new PhoenixTable(pc, table);
+ }
+ PhoenixSequence sequence = sequenceMap.get(name);
+ return sequence;
}
@Override
public Set<String> getTableNames() {
- return tableMap.keySet();
+ return Sets.union(tableMap.keySet(), sequenceMap.keySet());
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/57c3f3db/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSequence.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSequence.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSequence.java
new file mode 100644
index 0000000..e633c5e
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSequence.java
@@ -0,0 +1,42 @@
+package org.apache.phoenix.calcite;
+
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelOptTable.ToRelContext;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.TranslatableTable;
+import org.apache.calcite.schema.impl.AbstractTable;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+
+public class PhoenixSequence extends AbstractTable implements TranslatableTable {
+ public final String schemaName;
+ public final String sequenceName;
+ public final PhoenixConnection pc;
+
+ public PhoenixSequence(String schemaName, String sequenceName, PhoenixConnection pc) {
+ this.schemaName = schemaName;
+ this.sequenceName = sequenceName;
+ this.pc = pc;
+ }
+
+ @Override
+ public RelDataType getRowType(RelDataTypeFactory typeFactory) {
+ final RelDataTypeFactory.FieldInfoBuilder builder = typeFactory.builder();
+ builder.add("CURRENT_VALUE", typeFactory.createSqlType(SqlTypeName.BIGINT));
+ return builder.build();
+ }
+
+ @Override
+ public RelNode toRel(ToRelContext context, RelOptTable relOptTable) {
+ return null;
+ }
+
+ @Override
+ public Schema.TableType getJdbcTableType() {
+ return Schema.TableType.SEQUENCE;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/57c3f3db/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientProject.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientProject.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientProject.java
index 94552ab..304c0e5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientProject.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientProject.java
@@ -1,5 +1,6 @@
package org.apache.phoenix.calcite.rel;
+import java.sql.SQLException;
import java.util.List;
import org.apache.calcite.plan.RelOptCluster;
@@ -12,9 +13,19 @@ import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.metadata.RelMdCollation;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexNode;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.calcite.CalciteUtils;
+import org.apache.phoenix.calcite.PhoenixSequence;
import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.SequenceManager;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.execute.ClientScanPlan;
import org.apache.phoenix.execute.TupleProjectionPlan;
import org.apache.phoenix.execute.TupleProjector;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.schema.Sequence;
import com.google.common.base.Supplier;
@@ -47,8 +58,7 @@ public class PhoenixClientProject extends PhoenixAbstractProject {
@Override
public RelOptCost computeSelfCost(RelOptPlanner planner) {
- if (!getInput().getConvention().satisfies(PhoenixConvention.SERVERJOIN)
- && !getInput().getConvention().satisfies(PhoenixConvention.CLIENT))
+ if (!getInput().getConvention().satisfies(PhoenixConvention.GENERIC))
return planner.getCostFactory().makeInfiniteCost();
return super.computeSelfCost(planner)
@@ -61,7 +71,27 @@ public class PhoenixClientProject extends PhoenixAbstractProject {
QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
implementor.popContext();
+
+ PhoenixSequence sequence = CalciteUtils.findSequence(this);
+ final SequenceManager seqManager = sequence == null ?
+ null : new SequenceManager(new PhoenixStatement(sequence.pc));
+ implementor.setSequenceManager(seqManager);
TupleProjector tupleProjector = project(implementor);
+ if (seqManager != null) {
+ try {
+ seqManager.validateSequences(Sequence.ValueOp.VALIDATE_SEQUENCE);
+ StatementContext context = new StatementContext(
+ plan.getContext().getStatement(),
+ plan.getContext().getResolver(),
+ new Scan(), seqManager);
+ plan = new ClientScanPlan(
+ context, plan.getStatement(), plan.getTableRef(),
+ RowProjector.EMPTY_PROJECTOR, null, null,
+ OrderBy.EMPTY_ORDER_BY, plan);
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
return new TupleProjectionPlan(plan, tupleProjector, null);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/57c3f3db/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixFilter.java
index 796ea00..0f37bc1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixFilter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixFilter.java
@@ -6,12 +6,15 @@ import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptCost;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.plan.RelOptUtil.InputFinder;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelCollationTraitDef;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Filter;
import org.apache.calcite.rel.metadata.RelMdCollation;
import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.ImmutableIntList;
import org.apache.phoenix.calcite.CalciteUtils;
import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
import org.apache.phoenix.compile.QueryPlan;
@@ -56,7 +59,12 @@ public class PhoenixFilter extends Filter implements PhoenixRel {
}
public QueryPlan implement(Implementor implementor) {
+ ImmutableIntList columnRefList = implementor.getCurrentContext().columnRefList;
+ ImmutableBitSet bitSet = InputFinder.analyze(condition).inputBitSet.addAll(columnRefList).build();
+ columnRefList = ImmutableIntList.copyOf(bitSet.asList());
+ implementor.pushContext(implementor.getCurrentContext().withColumnRefList(columnRefList));
QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
+ implementor.popContext();
Expression expr = CalciteUtils.toExpression(condition, implementor);
return new ClientScanPlan(plan.getContext(), plan.getStatement(), plan.getTableRef(),
plan.getProjector(), null, expr, OrderBy.EMPTY_ORDER_BY, plan);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/57c3f3db/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java
index 92d8ad0..a15ceb0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java
@@ -5,13 +5,17 @@ import java.util.List;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.metadata.RelMetadataProvider;
import org.apache.calcite.util.ImmutableIntList;
+import org.apache.phoenix.calcite.PhoenixSequence;
import org.apache.phoenix.calcite.metadata.PhoenixRelMetadataProvider;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.SequenceManager;
+import org.apache.phoenix.compile.SequenceValueExpression;
import org.apache.phoenix.execute.RuntimeContext;
import org.apache.phoenix.execute.TupleProjector;
import org.apache.phoenix.expression.ColumnExpression;
import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.parse.SequenceValueParseNode;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.types.PDataType;
@@ -66,9 +70,11 @@ public interface PhoenixRel extends RelNode {
ColumnExpression newColumnExpression(int index);
@SuppressWarnings("rawtypes")
Expression newFieldAccessExpression(String variableId, int index, PDataType type);
+ SequenceValueExpression newSequenceExpression(PhoenixSequence seq, SequenceValueParseNode.Op op);
RuntimeContext getRuntimeContext();
void setTableRef(TableRef tableRef);
TableRef getTableRef();
+ void setSequenceManager(SequenceManager sequenceManager);
void pushContext(ImplementorContext context);
ImplementorContext popContext();
ImplementorContext getCurrentContext();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/57c3f3db/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java
index cd6f599..341342c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java
@@ -7,12 +7,15 @@ import java.util.Stack;
import org.apache.hadoop.hbase.HConstants;
import org.apache.phoenix.calcite.CalciteUtils;
+import org.apache.phoenix.calcite.PhoenixSequence;
import org.apache.phoenix.calcite.PhoenixTable;
import org.apache.phoenix.calcite.rel.PhoenixRel.ImplementorContext;
import org.apache.phoenix.compile.ColumnProjector;
import org.apache.phoenix.compile.ExpressionProjector;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.SequenceManager;
+import org.apache.phoenix.compile.SequenceValueExpression;
import org.apache.phoenix.compile.TupleProjectionCompiler;
import org.apache.phoenix.coprocessor.MetaDataProtocol;
import org.apache.phoenix.execute.RuntimeContext;
@@ -21,6 +24,8 @@ import org.apache.phoenix.expression.ColumnExpression;
import org.apache.phoenix.expression.CorrelateVariableFieldAccessExpression;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.parse.ParseNodeFactory;
+import org.apache.phoenix.parse.SequenceValueParseNode;
+import org.apache.phoenix.parse.TableName;
import org.apache.phoenix.schema.ColumnRef;
import org.apache.phoenix.schema.KeyValueSchema;
import org.apache.phoenix.schema.PColumn;
@@ -42,6 +47,7 @@ public class PhoenixRelImplementorImpl implements PhoenixRel.Implementor {
private TableRef tableRef;
private List<PColumn> mappedColumns;
private Stack<ImplementorContext> contextStack;
+ private SequenceManager sequenceManager;
public PhoenixRelImplementorImpl(RuntimeContext runtimeContext) {
this.runtimeContext = runtimeContext;
@@ -67,6 +73,17 @@ public class PhoenixRelImplementorImpl implements PhoenixRel.Implementor {
}
@Override
+ public SequenceValueExpression newSequenceExpression(PhoenixSequence seq, SequenceValueParseNode.Op op) {
+ PName tenantName = seq.pc.getTenantId();
+ TableName tableName = TableName.create(seq.schemaName, seq.sequenceName);
+ try {
+ return sequenceManager.newSequenceReference(tenantName, tableName, null, op);
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
public RuntimeContext getRuntimeContext() {
return runtimeContext;
}
@@ -81,6 +98,11 @@ public class PhoenixRelImplementorImpl implements PhoenixRel.Implementor {
public TableRef getTableRef() {
return this.tableRef;
}
+
+ @Override
+ public void setSequenceManager(SequenceManager sequenceManager) {
+ this.sequenceManager = sequenceManager;
+ }
@Override
public void pushContext(ImplementorContext context) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/57c3f3db/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java
index 84b4a60..3fd9b04 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java
@@ -81,8 +81,7 @@ public class PhoenixConverterRules {
PhoenixServerSortRule.SERVERJOIN,
PhoenixLimitRule.INSTANCE,
PhoenixFilterRule.INSTANCE,
- PhoenixClientProjectRule.SERVERJOIN,
- PhoenixClientProjectRule.CLIENT,
+ PhoenixClientProjectRule.INSTANCE,
PhoenixServerProjectRule.INSTANCE,
PhoenixClientAggregateRule.INSTANCE,
PhoenixServerAggregateRule.SERVER,
@@ -106,8 +105,7 @@ public class PhoenixConverterRules {
PhoenixServerSortRule.SERVERJOIN,
PhoenixLimitRule.INSTANCE,
PhoenixFilterRule.CONVERTIBLE,
- PhoenixClientProjectRule.CONVERTIBLE_SERVERJOIN,
- PhoenixClientProjectRule.CONVERTIBLE_CLIENT,
+ PhoenixClientProjectRule.CONVERTIBLE,
PhoenixServerProjectRule.CONVERTIBLE,
PhoenixClientAggregateRule.CONVERTIBLE,
PhoenixServerAggregateRule.CONVERTIBLE_SERVER,
@@ -301,30 +299,14 @@ public class PhoenixConverterRules {
}
};
- private static final PhoenixClientProjectRule SERVERJOIN =
- new PhoenixClientProjectRule(
- Predicates.<LogicalProject>alwaysTrue(),
- PhoenixConvention.SERVERJOIN);
- private static final PhoenixClientProjectRule CLIENT =
- new PhoenixClientProjectRule(
- Predicates.<LogicalProject>alwaysTrue(),
- PhoenixConvention.CLIENT);
- private static final PhoenixClientProjectRule CONVERTIBLE_SERVERJOIN =
- new PhoenixClientProjectRule(
- IS_CONVERTIBLE,
- PhoenixConvention.SERVERJOIN);
- private static final PhoenixClientProjectRule CONVERTIBLE_CLIENT =
- new PhoenixClientProjectRule(
- IS_CONVERTIBLE,
- PhoenixConvention.CLIENT);
+ private static final PhoenixClientProjectRule INSTANCE =
+ new PhoenixClientProjectRule(Predicates.<LogicalProject>alwaysTrue());
+ private static final PhoenixClientProjectRule CONVERTIBLE =
+ new PhoenixClientProjectRule(IS_CONVERTIBLE);
- private final Convention inputConvention;
-
- private PhoenixClientProjectRule(Predicate<LogicalProject> predicate, Convention inputConvention) {
+ private PhoenixClientProjectRule(Predicate<LogicalProject> predicate) {
super(LogicalProject.class, predicate, Convention.NONE,
- PhoenixConvention.CLIENT,
- "PhoenixClientProjectRule:" + inputConvention);
- this.inputConvention = inputConvention;
+ PhoenixConvention.CLIENT, "PhoenixClientProjectRule");
}
public RelNode convert(RelNode rel) {
@@ -332,7 +314,7 @@ public class PhoenixConverterRules {
return PhoenixClientProject.create(
convert(
project.getInput(),
- project.getInput().getTraitSet().replace(inputConvention)),
+ project.getInput().getTraitSet().replace(PhoenixConvention.GENERIC)),
project.getProjects(),
project.getRowType());
}
@@ -351,9 +333,16 @@ public class PhoenixConverterRules {
}
};
- private static final PhoenixServerProjectRule INSTANCE = new PhoenixServerProjectRule(Predicates.<LogicalProject>alwaysTrue());
+ private static Predicate<LogicalProject> NO_SEQUENCE = new Predicate<LogicalProject>() {
+ @Override
+ public boolean apply(LogicalProject input) {
+ return !CalciteUtils.hasSequenceValueCall(input);
+ }
+ };
+
+ private static final PhoenixServerProjectRule INSTANCE = new PhoenixServerProjectRule(NO_SEQUENCE);
- private static final PhoenixServerProjectRule CONVERTIBLE = new PhoenixServerProjectRule(IS_CONVERTIBLE);
+ private static final PhoenixServerProjectRule CONVERTIBLE = new PhoenixServerProjectRule(Predicates.and(NO_SEQUENCE, IS_CONVERTIBLE));
private PhoenixServerProjectRule(Predicate<LogicalProject> predicate) {
super(LogicalProject.class, predicate, Convention.NONE,
http://git-wip-us.apache.org/repos/asf/phoenix/blob/57c3f3db/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java
index 5ec8cd2..c24ab57 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java
@@ -119,25 +119,29 @@ public class SequenceManager {
return dstSequenceValues[index];
}
}
-
+
public SequenceValueExpression newSequenceReference(SequenceValueParseNode node) throws SQLException {
PName tenantName = statement.getConnection().getTenantId();
- String tenantId = tenantName == null ? null : tenantName.getString();
TableName tableName = node.getTableName();
+ ParseNode numToAllocateNode = node.getNumToAllocateNode();
+ return newSequenceReference(tenantName, tableName, numToAllocateNode, node.getOp());
+ }
+
+ public SequenceValueExpression newSequenceReference(PName tenantName,
+ TableName tableName, ParseNode numToAllocateNode, SequenceValueParseNode.Op op) throws SQLException {
+ String tenantId = tenantName == null ? null : tenantName.getString();
int nSaltBuckets = statement.getConnection().getQueryServices().getSequenceSaltBuckets();
- ParseNode numToAllocateNode = node.getNumToAllocateNode();
-
long numToAllocate = determineNumToAllocate(tableName, numToAllocateNode);
SequenceKey key = new SequenceKey(tenantId, tableName.getSchemaName(), tableName.getTableName(), nSaltBuckets);
SequenceValueExpression expression = sequenceMap.get(key);
if (expression == null) {
int index = sequenceMap.size();
- expression = new SequenceValueExpression(key, node.getOp(), index, numToAllocate);
+ expression = new SequenceValueExpression(key, op, index, numToAllocate);
sequenceMap.put(key, expression);
- } else if (expression.op != node.getOp() || expression.getNumToAllocate() < numToAllocate) {
+ } else if (expression.op != op || expression.getNumToAllocate() < numToAllocate) {
// Keep the maximum allocation size we see in a statement
SequenceValueExpression oldExpression = expression;
- expression = new SequenceValueExpression(key, node.getOp(), expression.getIndex(), Math.max(expression.getNumToAllocate(), numToAllocate));
+ expression = new SequenceValueExpression(key, op, expression.getIndex(), Math.max(expression.getNumToAllocate(), numToAllocate));
if (oldExpression.getNumToAllocate() < numToAllocate) {
// If we found a NEXT VALUE expression with a higher number to allocate
// We override the original expression
@@ -145,7 +149,7 @@ public class SequenceManager {
}
}
// If we see a NEXT and a CURRENT, treat the CURRENT just like a NEXT
- if (node.getOp() == Op.NEXT_VALUE) {
+ if (op == Op.NEXT_VALUE) {
isNextSequence.set(expression.getIndex());
}