You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/06/22 20:20:21 UTC
[1/2] beam git commit: restrict the scope of BeamSqlEnv
Repository: beam
Updated Branches:
refs/heads/DSL_SQL 887cf3a1a -> a680904a4
restrict the scope of BeamSqlEnv
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c5db2777
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c5db2777
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c5db2777
Branch: refs/heads/DSL_SQL
Commit: c5db2777a4cc7d6d230fba74e7c12fd18fcc07c3
Parents: 887cf3a
Author: mingmxu <mi...@ebay.com>
Authored: Fri Jun 16 18:49:18 2017 -0700
Committer: Tyler Akidau <ta...@apache.org>
Committed: Thu Jun 22 13:18:12 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/dsls/sql/BeamSql.java | 57 +++++++++++---------
.../org/apache/beam/dsls/sql/BeamSqlCli.java | 18 +++----
.../org/apache/beam/dsls/sql/BeamSqlEnv.java | 12 ++---
.../beam/dsls/sql/example/BeamSqlExample.java | 4 +-
.../math/BeamSqlMathBinaryExpression.java | 1 -
.../beam/dsls/sql/planner/BeamQueryPlanner.java | 7 +--
.../beam/dsls/sql/rel/BeamAggregationRel.java | 7 +--
.../apache/beam/dsls/sql/rel/BeamFilterRel.java | 8 +--
.../apache/beam/dsls/sql/rel/BeamIOSinkRel.java | 9 ++--
.../beam/dsls/sql/rel/BeamIOSourceRel.java | 7 ++-
.../beam/dsls/sql/rel/BeamIntersectRel.java | 8 +--
.../apache/beam/dsls/sql/rel/BeamMinusRel.java | 8 +--
.../beam/dsls/sql/rel/BeamProjectRel.java | 7 +--
.../apache/beam/dsls/sql/rel/BeamRelNode.java | 7 +--
.../dsls/sql/rel/BeamSetOperatorRelBase.java | 10 ++--
.../apache/beam/dsls/sql/rel/BeamSortRel.java | 7 +--
.../apache/beam/dsls/sql/rel/BeamUnionRel.java | 8 +--
.../apache/beam/dsls/sql/rel/BeamValuesRel.java | 6 +--
.../beam/dsls/sql/utils/CalciteUtils.java | 1 -
.../beam/dsls/sql/rel/BeamIntersectRelTest.java | 10 ++--
.../beam/dsls/sql/rel/BeamMinusRelTest.java | 10 ++--
.../sql/rel/BeamSetOperatorRelBaseTest.java | 8 +--
.../beam/dsls/sql/rel/BeamSortRelTest.java | 26 ++++-----
.../beam/dsls/sql/rel/BeamUnionRelTest.java | 8 +--
.../beam/dsls/sql/rel/BeamValuesRelTest.java | 12 +++--
25 files changed, 141 insertions(+), 125 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
index 04fe055..e68188b 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
@@ -17,9 +17,6 @@
*/
package org.apache.beam.dsls.sql;
-import static org.apache.beam.dsls.sql.BeamSqlEnv.planner;
-import static org.apache.beam.dsls.sql.BeamSqlEnv.registerTable;
-
import org.apache.beam.dsls.sql.rel.BeamRelNode;
import org.apache.beam.dsls.sql.schema.BeamPCollectionTable;
import org.apache.beam.dsls.sql.schema.BeamSqlRow;
@@ -47,17 +44,15 @@ PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
//create table from TextIO;
-TableSchema tableASchema = ...;
PCollection<BeamSqlRow> inputTableA = p.apply(TextIO.read().from("/my/input/patha"))
- .apply(BeamSql.fromTextRow(tableASchema));
-TableSchema tableBSchema = ...;
+ .apply(...);
PCollection<BeamSqlRow> inputTableB = p.apply(TextIO.read().from("/my/input/pathb"))
- .apply(BeamSql.fromTextRow(tableBSchema));
+ .apply(...);
//run a simple query, and register the output as a table in BeamSql;
String sql1 = "select MY_FUNC(c1), c2 from TABLE_A";
-PCollection<BeamSqlRow> outputTableA = inputTableA.apply(BeamSql.simpleQuery(sql1))
- .withUdf("MY_FUNC", myFunc);
+PCollection<BeamSqlRow> outputTableA = inputTableA.apply(BeamSql.simpleQuery(sql1)
+ .withUdf("MY_FUNC", myFunc));
//run a JOIN with one table from TextIO, and one table from another query
PCollection<BeamSqlRow> outputTableB = PCollectionTuple.of(
@@ -107,35 +102,47 @@ public class BeamSql {
*/
private static class QueryTransform extends
PTransform<PCollectionTuple, PCollection<BeamSqlRow>> {
+ private BeamSqlEnv sqlEnv;
private String sqlQuery;
+
public QueryTransform(String sqlQuery) {
this.sqlQuery = sqlQuery;
+ sqlEnv = new BeamSqlEnv();
+ }
+
+ public QueryTransform(String sqlQuery, BeamSqlEnv sqlEnv) {
+ this.sqlQuery = sqlQuery;
+ this.sqlEnv = sqlEnv;
}
@Override
public PCollection<BeamSqlRow> expand(PCollectionTuple input) {
- //register tables
- for (TupleTag<?> sourceTag : input.getAll().keySet()) {
- PCollection<BeamSqlRow> sourceStream = (PCollection<BeamSqlRow>) input.get(sourceTag);
- BeamSqlRowCoder sourceCoder = (BeamSqlRowCoder) sourceStream.getCoder();
-
- registerTable(sourceTag.getId(),
- new BeamPCollectionTable(sourceStream, sourceCoder.getTableSchema()));
- }
+ registerTables(input);
BeamRelNode beamRelNode = null;
try {
- beamRelNode = planner.convertToBeamRel(sqlQuery);
+ beamRelNode = sqlEnv.planner.convertToBeamRel(sqlQuery);
} catch (ValidationException | RelConversionException | SqlParseException e) {
throw new IllegalStateException(e);
}
try {
- return beamRelNode.buildBeamPipeline(input);
+ return beamRelNode.buildBeamPipeline(input, sqlEnv);
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
+
+ //register tables, related with input PCollections.
+ private void registerTables(PCollectionTuple input){
+ for (TupleTag<?> sourceTag : input.getAll().keySet()) {
+ PCollection<BeamSqlRow> sourceStream = (PCollection<BeamSqlRow>) input.get(sourceTag);
+ BeamSqlRowCoder sourceCoder = (BeamSqlRowCoder) sourceStream.getCoder();
+
+ sqlEnv.registerTable(sourceTag.getId(),
+ new BeamPCollectionTable(sourceStream, sourceCoder.getTableSchema()));
+ }
+ }
}
/**
@@ -144,21 +151,19 @@ public class BeamSql {
*/
private static class SimpleQueryTransform
extends PTransform<PCollection<BeamSqlRow>, PCollection<BeamSqlRow>> {
+ BeamSqlEnv sqlEnv = new BeamSqlEnv();
private String sqlQuery;
+
public SimpleQueryTransform(String sqlQuery) {
this.sqlQuery = sqlQuery;
}
- public SimpleQueryTransform withUdf(String udfName){
- throw new UnsupportedOperationException("Pending for UDF support");
- }
-
@Override
public PCollection<BeamSqlRow> expand(PCollection<BeamSqlRow> input) {
SqlNode sqlNode;
try {
- sqlNode = planner.parseQuery(sqlQuery);
- planner.getPlanner().close();
+ sqlNode = sqlEnv.planner.parseQuery(sqlQuery);
+ sqlEnv.planner.getPlanner().close();
} catch (SqlParseException e) {
throw new IllegalStateException(e);
}
@@ -167,7 +172,7 @@ public class BeamSql {
SqlSelect select = (SqlSelect) sqlNode;
String tableName = select.getFrom().toString();
return PCollectionTuple.of(new TupleTag<BeamSqlRow>(tableName), input)
- .apply(BeamSql.query(sqlQuery));
+ .apply(new QueryTransform(sqlQuery, sqlEnv));
} else {
throw new UnsupportedOperationException(
"Sql operation: " + sqlNode.toString() + " is not supported!");
http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java
index dbf9a59..50da244 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java
@@ -17,8 +17,6 @@
*/
package org.apache.beam.dsls.sql;
-import static org.apache.beam.dsls.sql.BeamSqlEnv.planner;
-
import org.apache.beam.dsls.sql.rel.BeamRelNode;
import org.apache.beam.dsls.sql.schema.BeamSqlRow;
import org.apache.beam.sdk.Pipeline;
@@ -33,12 +31,11 @@ import org.apache.calcite.plan.RelOptUtil;
*/
@Experimental
public class BeamSqlCli {
-
/**
* Returns a human readable representation of the query execution plan.
*/
- public static String explainQuery(String sqlString) throws Exception {
- BeamRelNode exeTree = planner.convertToBeamRel(sqlString);
+ public static String explainQuery(String sqlString, BeamSqlEnv sqlEnv) throws Exception {
+ BeamRelNode exeTree = sqlEnv.planner.convertToBeamRel(sqlString);
String beamPlan = RelOptUtil.toString(exeTree);
return beamPlan;
}
@@ -46,22 +43,23 @@ public class BeamSqlCli {
/**
* compile SQL, and return a {@link Pipeline}.
*/
- public static PCollection<BeamSqlRow> compilePipeline(String sqlStatement) throws Exception{
+ public static PCollection<BeamSqlRow> compilePipeline(String sqlStatement, BeamSqlEnv sqlEnv)
+ throws Exception{
PipelineOptions options = PipelineOptionsFactory.fromArgs(new String[] {}).withValidation()
.as(PipelineOptions.class); // FlinkPipelineOptions.class
options.setJobName("BeamPlanCreator");
Pipeline pipeline = Pipeline.create(options);
- return compilePipeline(sqlStatement, pipeline);
+ return compilePipeline(sqlStatement, pipeline, sqlEnv);
}
/**
* compile SQL, and return a {@link Pipeline}.
*/
- public static PCollection<BeamSqlRow> compilePipeline(String sqlStatement, Pipeline basePipeline)
- throws Exception{
+ public static PCollection<BeamSqlRow> compilePipeline(String sqlStatement, Pipeline basePipeline
+ , BeamSqlEnv sqlEnv) throws Exception{
PCollection<BeamSqlRow> resultStream =
- planner.compileBeamPipeline(sqlStatement, basePipeline);
+ sqlEnv.planner.compileBeamPipeline(sqlStatement, basePipeline, sqlEnv);
return resultStream;
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java
index d7715c7..baa2617 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java
@@ -42,10 +42,10 @@ import org.apache.calcite.tools.Frameworks;
* a {@link BeamQueryPlanner} which parse/validate/optimize/translate input SQL queries.
*/
public class BeamSqlEnv {
- static SchemaPlus schema;
- static BeamQueryPlanner planner;
+ SchemaPlus schema;
+ BeamQueryPlanner planner;
- static {
+ public BeamSqlEnv() {
schema = Frameworks.createRootSchema(true);
planner = new BeamQueryPlanner(schema);
}
@@ -53,7 +53,7 @@ public class BeamSqlEnv {
/**
* Register a UDF function which can be used in SQL expression.
*/
- public static void registerUdf(String functionName, Class<?> clazz, String methodName) {
+ public void registerUdf(String functionName, Class<?> clazz, String methodName) {
schema.add(functionName, ScalarFunctionImpl.create(clazz, methodName));
}
@@ -61,7 +61,7 @@ public class BeamSqlEnv {
* Registers a {@link BaseBeamTable} which can be used for all subsequent queries.
*
*/
- public static void registerTable(String tableName, BaseBeamTable table) {
+ public void registerTable(String tableName, BaseBeamTable table) {
schema.add(tableName, new BeamCalciteTable(table.getRecordType()));
planner.getSourceTables().put(tableName, table);
}
@@ -69,7 +69,7 @@ public class BeamSqlEnv {
/**
* Find {@link BaseBeamTable} by table name.
*/
- public static BaseBeamTable findTable(String tableName){
+ public BaseBeamTable findTable(String tableName){
return planner.getSourceTables().get(tableName);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
index 31f8302..5f09fdd 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
@@ -63,13 +63,13 @@ class BeamSqlExample {
//Case 1. run a simple SQL query over input PCollection with BeamSql.simpleQuery;
PCollection<BeamSqlRow> outputStream = inputTable.apply(
- BeamSql.simpleQuery("select c2, c3 from TABLE_A where c1=1"));
+ BeamSql.simpleQuery("select c2, c3 from PCOLLECTION where c1=1"));
//log out the output record;
outputStream.apply("log_result",
MapElements.<BeamSqlRow, Void>via(new SimpleFunction<BeamSqlRow, Void>() {
public Void apply(BeamSqlRow input) {
- System.out.println("TABLE_A: " + input);
+ System.out.println("PCOLLECTION: " + input);
return null;
}
}));
http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathBinaryExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathBinaryExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathBinaryExpression.java
index 11b867a..f79bcf6 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathBinaryExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathBinaryExpression.java
@@ -57,7 +57,6 @@ public abstract class BeamSqlMathBinaryExpression extends BeamSqlExpression {
/**
* The method to check whether operands are numeric or not.
- * @param opType
*/
public boolean isOperandNumeric(SqlTypeName opType) {
return SqlTypeName.NUMERIC_TYPES.contains(opType);
http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
index 2eaf9e7..6ae8a1e 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
@@ -22,6 +22,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.beam.dsls.sql.BeamSqlEnv;
import org.apache.beam.dsls.sql.rel.BeamLogicalConvention;
import org.apache.beam.dsls.sql.rel.BeamRelNode;
import org.apache.beam.dsls.sql.schema.BaseBeamTable;
@@ -106,12 +107,12 @@ public class BeamQueryPlanner {
* which is linked with the given {@code pipeline}. The final output stream is returned as
* {@code PCollection} so more operations can be applied.
*/
- public PCollection<BeamSqlRow> compileBeamPipeline(String sqlStatement, Pipeline basePipeline)
- throws Exception {
+ public PCollection<BeamSqlRow> compileBeamPipeline(String sqlStatement, Pipeline basePipeline
+ , BeamSqlEnv sqlEnv) throws Exception {
BeamRelNode relNode = convertToBeamRel(sqlStatement);
// the input PCollectionTuple is empty, and be rebuilt in BeamIOSourceRel.
- return relNode.buildBeamPipeline(PCollectionTuple.empty(basePipeline));
+ return relNode.buildBeamPipeline(PCollectionTuple.empty(basePipeline), sqlEnv);
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
index bcdc44f..7a1d003a 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
@@ -19,6 +19,7 @@ package org.apache.beam.dsls.sql.rel;
import java.util.ArrayList;
import java.util.List;
+import org.apache.beam.dsls.sql.BeamSqlEnv;
import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
import org.apache.beam.dsls.sql.schema.BeamSqlRow;
import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
@@ -72,13 +73,13 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode {
}
@Override
- public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections)
- throws Exception {
+ public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+ , BeamSqlEnv sqlEnv) throws Exception {
RelNode input = getInput();
String stageName = BeamSqlRelUtils.getStageName(this);
PCollection<BeamSqlRow> upstream =
- BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections);
+ BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv);
if (windowFieldIdx != -1) {
upstream = upstream.apply(stageName + "_assignEventTimestamp", WithTimestamps
.<BeamSqlRow>of(new BeamAggregationTransforms.WindowTimestampFn(windowFieldIdx)))
http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java
index 40fe05c..07b5c7c 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.dsls.sql.rel;
+import org.apache.beam.dsls.sql.BeamSqlEnv;
import org.apache.beam.dsls.sql.interpreter.BeamSqlExpressionExecutor;
import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutor;
import org.apache.beam.dsls.sql.schema.BeamSqlRow;
@@ -49,14 +50,13 @@ public class BeamFilterRel extends Filter implements BeamRelNode {
}
@Override
- public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections)
- throws Exception {
-
+ public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+ , BeamSqlEnv sqlEnv) throws Exception {
RelNode input = getInput();
String stageName = BeamSqlRelUtils.getStageName(this);
PCollection<BeamSqlRow> upstream =
- BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections);
+ BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv);
BeamSqlExpressionExecutor executor = new BeamSqlFnExecutor(this);
http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java
index 88fff63..58539f8 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java
@@ -56,18 +56,17 @@ public class BeamIOSinkRel extends TableModify implements BeamRelNode {
* which is the persisted PCollection.
*/
@Override
- public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections)
- throws Exception {
-
+ public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+ , BeamSqlEnv sqlEnv) throws Exception {
RelNode input = getInput();
String stageName = BeamSqlRelUtils.getStageName(this);
PCollection<BeamSqlRow> upstream =
- BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections);
+ BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv);
String sourceName = Joiner.on('.').join(getTable().getQualifiedName());
- BaseBeamTable targetTable = BeamSqlEnv.findTable(sourceName);
+ BaseBeamTable targetTable = sqlEnv.findTable(sourceName);
PDone streamEnd = upstream.apply(stageName, targetTable.buildIOWriter());
http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
index ed2bf12..a664ce1 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
@@ -40,9 +40,8 @@ public class BeamIOSourceRel extends TableScan implements BeamRelNode {
}
@Override
- public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections)
- throws Exception {
-
+ public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+ , BeamSqlEnv sqlEnv) throws Exception {
String sourceName = Joiner.on('.').join(getTable().getQualifiedName());
String stageName = BeamSqlRelUtils.getStageName(this);
@@ -55,7 +54,7 @@ public class BeamIOSourceRel extends TableScan implements BeamRelNode {
return sourceStream;
} else {
//If not, the source PColection is provided with BaseBeamTable.buildIOReader().
- BaseBeamTable sourceTable = BeamSqlEnv.findTable(sourceName);
+ BaseBeamTable sourceTable = sqlEnv.findTable(sourceName);
return sourceTable.buildIOReader(inputPCollections.getPipeline());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java
index 01e1c33..7cab171 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java
@@ -19,7 +19,7 @@
package org.apache.beam.dsls.sql.rel;
import java.util.List;
-
+import org.apache.beam.dsls.sql.BeamSqlEnv;
import org.apache.beam.dsls.sql.schema.BeamSqlRow;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
@@ -51,8 +51,8 @@ public class BeamIntersectRel extends Intersect implements BeamRelNode {
return new BeamIntersectRel(getCluster(), traitSet, inputs, all);
}
- @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections)
- throws Exception {
- return delegate.buildBeamPipeline(inputPCollections);
+ @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+ , BeamSqlEnv sqlEnv) throws Exception {
+ return delegate.buildBeamPipeline(inputPCollections, sqlEnv);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java
index bee6c11..b558f4b 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java
@@ -19,7 +19,7 @@
package org.apache.beam.dsls.sql.rel;
import java.util.List;
-
+import org.apache.beam.dsls.sql.BeamSqlEnv;
import org.apache.beam.dsls.sql.schema.BeamSqlRow;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
@@ -49,8 +49,8 @@ public class BeamMinusRel extends Minus implements BeamRelNode {
return new BeamMinusRel(getCluster(), traitSet, inputs, all);
}
- @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections)
- throws Exception {
- return delegate.buildBeamPipeline(inputPCollections);
+ @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+ , BeamSqlEnv sqlEnv) throws Exception {
+ return delegate.buildBeamPipeline(inputPCollections, sqlEnv);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java
index e6331c6..2cdfc72 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java
@@ -18,6 +18,7 @@
package org.apache.beam.dsls.sql.rel;
import java.util.List;
+import org.apache.beam.dsls.sql.BeamSqlEnv;
import org.apache.beam.dsls.sql.interpreter.BeamSqlExpressionExecutor;
import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutor;
import org.apache.beam.dsls.sql.schema.BeamSqlRow;
@@ -59,13 +60,13 @@ public class BeamProjectRel extends Project implements BeamRelNode {
}
@Override
- public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections)
- throws Exception {
+ public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+ , BeamSqlEnv sqlEnv) throws Exception {
RelNode input = getInput();
String stageName = BeamSqlRelUtils.getStageName(this);
PCollection<BeamSqlRow> upstream =
- BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections);
+ BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv);
BeamSqlExpressionExecutor executor = new BeamSqlFnExecutor(this);
http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java
index aed4b06..d4c98a3 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java
@@ -17,14 +17,14 @@
*/
package org.apache.beam.dsls.sql.rel;
+import org.apache.beam.dsls.sql.BeamSqlEnv;
import org.apache.beam.dsls.sql.schema.BeamSqlRow;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.calcite.rel.RelNode;
/**
- * A new method {@link #buildBeamPipeline(PCollectionTuple)} is added, it's
- * called by {@code BeamQueryPlanner}.
+ * A new method {@link #buildBeamPipeline(PCollectionTuple, BeamSqlEnv)} is added.
*/
public interface BeamRelNode extends RelNode {
@@ -33,5 +33,6 @@ public interface BeamRelNode extends RelNode {
* {@code BeamQueryPlanner} visits it with a DFS(Depth-First-Search)
* algorithm.
*/
- PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections) throws Exception;
+ PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections, BeamSqlEnv sqlEnv)
+ throws Exception;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java
index 3d41e3a..939c9c8 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java
@@ -20,7 +20,7 @@ package org.apache.beam.dsls.sql.rel;
import java.io.Serializable;
import java.util.List;
-
+import org.apache.beam.dsls.sql.BeamSqlEnv;
import org.apache.beam.dsls.sql.schema.BeamSqlRow;
import org.apache.beam.dsls.sql.transform.BeamSetOperatorsTransforms;
import org.apache.beam.sdk.transforms.MapElements;
@@ -62,12 +62,12 @@ public class BeamSetOperatorRelBase {
this.all = all;
}
- public PCollection<BeamSqlRow> buildBeamPipeline(
- PCollectionTuple inputPCollections) throws Exception {
+ public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+ , BeamSqlEnv sqlEnv) throws Exception {
PCollection<BeamSqlRow> leftRows = BeamSqlRelUtils.getBeamRelInput(inputs.get(0))
- .buildBeamPipeline(inputPCollections);
+ .buildBeamPipeline(inputPCollections, sqlEnv);
PCollection<BeamSqlRow> rightRows = BeamSqlRelUtils.getBeamRelInput(inputs.get(1))
- .buildBeamPipeline(inputPCollections);
+ .buildBeamPipeline(inputPCollections, sqlEnv);
WindowFn leftWindow = leftRows.getWindowingStrategy().getWindowFn();
WindowFn rightWindow = rightRows.getWindowingStrategy().getWindowFn();
http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java
index 6c7be0b..75f9717 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java
@@ -24,6 +24,7 @@ import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
+import org.apache.beam.dsls.sql.BeamSqlEnv;
import org.apache.beam.dsls.sql.schema.BeamSqlRow;
import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
import org.apache.beam.dsls.sql.utils.CalciteUtils;
@@ -119,11 +120,11 @@ public class BeamSortRel extends Sort implements BeamRelNode {
}
}
- @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections)
- throws Exception {
+ @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+ , BeamSqlEnv sqlEnv) throws Exception {
RelNode input = getInput();
PCollection<BeamSqlRow> upstream = BeamSqlRelUtils.getBeamRelInput(input)
- .buildBeamPipeline(inputPCollections);
+ .buildBeamPipeline(inputPCollections, sqlEnv);
Type windowType = upstream.getWindowingStrategy().getWindowFn()
.getWindowTypeDescriptor().getType();
if (!windowType.equals(GlobalWindow.class)) {
http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java
index 63cf11a..c661585 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java
@@ -19,7 +19,7 @@
package org.apache.beam.dsls.sql.rel;
import java.util.List;
-
+import org.apache.beam.dsls.sql.BeamSqlEnv;
import org.apache.beam.dsls.sql.schema.BeamSqlRow;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.PCollection;
@@ -81,8 +81,8 @@ public class BeamUnionRel extends Union implements BeamRelNode {
return new BeamUnionRel(getCluster(), traitSet, inputs, all);
}
- @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections)
- throws Exception {
- return delegate.buildBeamPipeline(inputPCollections);
+ @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+ , BeamSqlEnv sqlEnv) throws Exception {
+ return delegate.buildBeamPipeline(inputPCollections, sqlEnv);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java
index ce75768..030d2c8 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java
@@ -22,7 +22,7 @@ import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.List;
-
+import org.apache.beam.dsls.sql.BeamSqlEnv;
import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
import org.apache.beam.dsls.sql.schema.BeamSqlRow;
import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
@@ -57,8 +57,8 @@ public class BeamValuesRel extends Values implements BeamRelNode {
}
- @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections)
- throws Exception {
+ @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+ , BeamSqlEnv sqlEnv) throws Exception {
List<BeamSqlRow> rows = new ArrayList<>(tuples.size());
String stageName = BeamSqlRelUtils.getStageName(this);
if (tuples.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java
index 69ca44b..ac395d3 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java
@@ -74,7 +74,6 @@ public class CalciteUtils {
/**
* Get the {@code SqlTypeName} for the specified column of a table.
- * @return
*/
public static SqlTypeName getFieldType(BeamSqlRecordType schema, int index) {
return toCalciteType(schema.getFieldsType().get(index));
http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java
index 02223c2..47fdc16 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java
@@ -34,6 +34,8 @@ import org.junit.Test;
* Test for {@code BeamIntersectRel}.
*/
public class BeamIntersectRelTest {
+ static BeamSqlEnv sqlEnv = new BeamSqlEnv();
+
@Rule
public final TestPipeline pipeline = TestPipeline.create();
private static MockedBeamSqlTable orderDetailsTable1 = MockedBeamSqlTable
@@ -57,8 +59,8 @@ public class BeamIntersectRelTest {
@BeforeClass
public static void setUp() {
- BeamSqlEnv.registerTable("ORDER_DETAILS1", orderDetailsTable1);
- BeamSqlEnv.registerTable("ORDER_DETAILS2", orderDetailsTable2);
+ sqlEnv.registerTable("ORDER_DETAILS1", orderDetailsTable1);
+ sqlEnv.registerTable("ORDER_DETAILS2", orderDetailsTable2);
}
@Test
@@ -70,7 +72,7 @@ public class BeamIntersectRelTest {
+ "SELECT order_id, site_id, price "
+ "FROM ORDER_DETAILS2 ";
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline);
+ PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
PAssert.that(rows).containsInAnyOrder(
MockedBeamSqlTable.of(
SqlTypeName.BIGINT, "order_id",
@@ -93,7 +95,7 @@ public class BeamIntersectRelTest {
+ "SELECT order_id, site_id, price "
+ "FROM ORDER_DETAILS2 ";
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline);
+ PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
PAssert.that(rows).satisfies(new CheckSize(3));
PAssert.that(rows).containsInAnyOrder(
http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java
index cd6ba16..688ff8e 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java
@@ -34,6 +34,8 @@ import org.junit.Test;
* Test for {@code BeamMinusRel}.
*/
public class BeamMinusRelTest {
+ static BeamSqlEnv sqlEnv = new BeamSqlEnv();
+
@Rule
public final TestPipeline pipeline = TestPipeline.create();
private MockedBeamSqlTable orderDetailsTable1 = MockedBeamSqlTable
@@ -58,8 +60,8 @@ public class BeamMinusRelTest {
@Before
public void setUp() {
- BeamSqlEnv.registerTable("ORDER_DETAILS1", orderDetailsTable1);
- BeamSqlEnv.registerTable("ORDER_DETAILS2", orderDetailsTable2);
+ sqlEnv.registerTable("ORDER_DETAILS1", orderDetailsTable1);
+ sqlEnv.registerTable("ORDER_DETAILS2", orderDetailsTable2);
MockedBeamSqlTable.CONTENT.clear();
}
@@ -72,7 +74,7 @@ public class BeamMinusRelTest {
+ "SELECT order_id, site_id, price "
+ "FROM ORDER_DETAILS2 ";
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline);
+ PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
PAssert.that(rows).containsInAnyOrder(
MockedBeamSqlTable.of(
SqlTypeName.BIGINT, "order_id",
@@ -93,7 +95,7 @@ public class BeamMinusRelTest {
+ "SELECT order_id, site_id, price "
+ "FROM ORDER_DETAILS2 ";
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline);
+ PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
PAssert.that(rows).satisfies(new CheckSize(2));
PAssert.that(rows).containsInAnyOrder(
http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java
index 4936062..f10a767 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java
@@ -42,6 +42,8 @@ import org.junit.Test;
* Test for {@code BeamSetOperatorRelBase}.
*/
public class BeamSetOperatorRelBaseTest {
+ static BeamSqlEnv sqlEnv = new BeamSqlEnv();
+
@Rule
public final TestPipeline pipeline = TestPipeline.create();
public static final Date THE_DATE = new Date();
@@ -57,7 +59,7 @@ public class BeamSetOperatorRelBaseTest {
@BeforeClass
public static void prepare() {
THE_DATE.setTime(100000);
- BeamSqlEnv.registerTable("ORDER_DETAILS", orderDetailsTable);
+ sqlEnv.registerTable("ORDER_DETAILS", orderDetailsTable);
}
@Test
@@ -71,7 +73,7 @@ public class BeamSetOperatorRelBaseTest {
+ "FROM ORDER_DETAILS GROUP BY order_id, site_id"
+ ", TUMBLE(order_time, INTERVAL '1' HOUR) ";
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline);
+ PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
List<BeamSqlRow> expRows =
MockedBeamSqlTable.of(
SqlTypeName.BIGINT, "order_id",
@@ -100,7 +102,7 @@ public class BeamSetOperatorRelBaseTest {
// use a real pipeline rather than the TestPipeline because we are
// testing exceptions, the pipeline will not actually run.
Pipeline pipeline1 = Pipeline.create(PipelineOptionsFactory.create());
- BeamSqlCli.compilePipeline(sql, pipeline1);
+ BeamSqlCli.compilePipeline(sql, pipeline1, sqlEnv);
pipeline.run();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java
index cfdbd53..2519984 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java
@@ -36,6 +36,8 @@ import org.junit.Test;
* Test for {@code BeamSortRel}.
*/
public class BeamSortRelTest {
+ static BeamSqlEnv sqlEnv = new BeamSqlEnv();
+
@Rule
public final TestPipeline pipeline = TestPipeline.create();
@@ -69,7 +71,7 @@ public class BeamSortRelTest {
+ "ORDER BY order_id asc, site_id desc limit 4";
System.out.println(sql);
- BeamSqlCli.compilePipeline(sql, pipeline);
+ BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
pipeline.run().waitUntilFinish();
assertEquals(
@@ -86,7 +88,7 @@ public class BeamSortRelTest {
@Test
public void testOrderBy_nullsFirst() throws Exception {
- BeamSqlEnv.registerTable("ORDER_DETAILS", MockedBeamSqlTable
+ sqlEnv.registerTable("ORDER_DETAILS", MockedBeamSqlTable
.of(SqlTypeName.BIGINT, "order_id",
SqlTypeName.INTEGER, "site_id",
SqlTypeName.DOUBLE, "price",
@@ -96,7 +98,7 @@ public class BeamSortRelTest {
2L, 1, 3.0,
2L, null, 4.0,
5L, 5, 5.0));
- BeamSqlEnv.registerTable("SUB_ORDER_RAM", MockedBeamSqlTable
+ sqlEnv.registerTable("SUB_ORDER_RAM", MockedBeamSqlTable
.of(SqlTypeName.BIGINT, "order_id",
SqlTypeName.INTEGER, "site_id",
SqlTypeName.DOUBLE, "price"));
@@ -106,7 +108,7 @@ public class BeamSortRelTest {
+ "FROM ORDER_DETAILS "
+ "ORDER BY order_id asc, site_id desc NULLS FIRST limit 4";
- BeamSqlCli.compilePipeline(sql, pipeline);
+ BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
pipeline.run().waitUntilFinish();
assertEquals(
@@ -124,7 +126,7 @@ public class BeamSortRelTest {
@Test
public void testOrderBy_nullsLast() throws Exception {
- BeamSqlEnv.registerTable("ORDER_DETAILS", MockedBeamSqlTable
+ sqlEnv.registerTable("ORDER_DETAILS", MockedBeamSqlTable
.of(SqlTypeName.BIGINT, "order_id",
SqlTypeName.INTEGER, "site_id",
SqlTypeName.DOUBLE, "price",
@@ -134,7 +136,7 @@ public class BeamSortRelTest {
2L, 1, 3.0,
2L, null, 4.0,
5L, 5, 5.0));
- BeamSqlEnv.registerTable("SUB_ORDER_RAM", MockedBeamSqlTable
+ sqlEnv.registerTable("SUB_ORDER_RAM", MockedBeamSqlTable
.of(SqlTypeName.BIGINT, "order_id",
SqlTypeName.INTEGER, "site_id",
SqlTypeName.DOUBLE, "price"));
@@ -144,7 +146,7 @@ public class BeamSortRelTest {
+ "FROM ORDER_DETAILS "
+ "ORDER BY order_id asc, site_id desc NULLS LAST limit 4";
- BeamSqlCli.compilePipeline(sql, pipeline);
+ BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
pipeline.run().waitUntilFinish();
assertEquals(
@@ -167,7 +169,7 @@ public class BeamSortRelTest {
+ "FROM ORDER_DETAILS "
+ "ORDER BY order_id asc, site_id desc limit 4 offset 4";
- BeamSqlCli.compilePipeline(sql, pipeline);
+ BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
pipeline.run().waitUntilFinish();
assertEquals(
@@ -190,7 +192,7 @@ public class BeamSortRelTest {
+ "FROM ORDER_DETAILS "
+ "ORDER BY order_id asc, site_id desc limit 11";
- BeamSqlCli.compilePipeline(sql, pipeline);
+ BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
pipeline.run().waitUntilFinish();
assertEquals(
@@ -221,13 +223,13 @@ public class BeamSortRelTest {
+ "ORDER BY order_id asc limit 11";
TestPipeline pipeline = TestPipeline.create();
- BeamSqlCli.compilePipeline(sql, pipeline);
+ BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
}
@Before
public void prepare() {
- BeamSqlEnv.registerTable("ORDER_DETAILS", orderDetailTable);
- BeamSqlEnv.registerTable("SUB_ORDER_RAM", subOrderRamTable);
+ sqlEnv.registerTable("ORDER_DETAILS", orderDetailTable);
+ sqlEnv.registerTable("SUB_ORDER_RAM", subOrderRamTable);
MockedBeamSqlTable.CONTENT.clear();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java
index c2a0597..c5aa132 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java
@@ -34,6 +34,8 @@ import org.junit.Test;
* Test for {@code BeamUnionRel}.
*/
public class BeamUnionRelTest {
+ static BeamSqlEnv sqlEnv = new BeamSqlEnv();
+
@Rule
public final TestPipeline pipeline = TestPipeline.create();
private static MockedBeamSqlTable orderDetailsTable = MockedBeamSqlTable
@@ -46,7 +48,7 @@ public class BeamUnionRelTest {
@BeforeClass
public static void prepare() {
- BeamSqlEnv.registerTable("ORDER_DETAILS", orderDetailsTable);
+ sqlEnv.registerTable("ORDER_DETAILS", orderDetailsTable);
}
@Test
@@ -58,7 +60,7 @@ public class BeamUnionRelTest {
+ " order_id, site_id, price "
+ "FROM ORDER_DETAILS ";
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline);
+ PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
PAssert.that(rows).containsInAnyOrder(
MockedBeamSqlTable.of(
SqlTypeName.BIGINT, "order_id",
@@ -81,7 +83,7 @@ public class BeamUnionRelTest {
+ " SELECT order_id, site_id, price "
+ "FROM ORDER_DETAILS";
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline);
+ PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
PAssert.that(rows).containsInAnyOrder(
MockedBeamSqlTable.of(
SqlTypeName.BIGINT, "order_id",
http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java
index 4557e8e..9a5070a 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java
@@ -35,6 +35,8 @@ import org.junit.Test;
* Test for {@code BeamValuesRel}.
*/
public class BeamValuesRelTest {
+ static BeamSqlEnv sqlEnv = new BeamSqlEnv();
+
@Rule
public final TestPipeline pipeline = TestPipeline.create();
private static MockedBeamSqlTable stringTable = MockedBeamSqlTable
@@ -49,7 +51,7 @@ public class BeamValuesRelTest {
public void testValues() throws Exception {
String sql = "insert into string_table(name, description) values "
+ "('hello', 'world'), ('james', 'bond')";
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline);
+ PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
PAssert.that(rows).containsInAnyOrder(MockedBeamSqlTable.of(
SqlTypeName.VARCHAR, "name",
SqlTypeName.VARCHAR, "description",
@@ -61,7 +63,7 @@ public class BeamValuesRelTest {
@Test
public void testValues_castInt() throws Exception {
String sql = "insert into int_table (c0, c1) values(cast(1 as int), cast(2 as int))";
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline);
+ PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
PAssert.that(rows).containsInAnyOrder(MockedBeamSqlTable.of(
SqlTypeName.INTEGER, "c0",
SqlTypeName.INTEGER, "c1",
@@ -73,7 +75,7 @@ public class BeamValuesRelTest {
@Test
public void testValues_onlySelect() throws Exception {
String sql = "select 1, '1'";
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline);
+ PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
PAssert.that(rows).containsInAnyOrder(MockedBeamSqlTable.of(
SqlTypeName.INTEGER, "EXPR$0",
SqlTypeName.CHAR, "EXPR$1",
@@ -84,8 +86,8 @@ public class BeamValuesRelTest {
@BeforeClass
public static void prepareClass() {
- BeamSqlEnv.registerTable("string_table", stringTable);
- BeamSqlEnv.registerTable("int_table", intTable);
+ sqlEnv.registerTable("string_table", stringTable);
+ sqlEnv.registerTable("int_table", intTable);
}
@Before
[2/2] beam git commit: [BEAM-2446] This closes #3372
Posted by ta...@apache.org.
[BEAM-2446] This closes #3372
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a680904a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a680904a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a680904a
Branch: refs/heads/DSL_SQL
Commit: a680904a4416be4582ffdfae77a22c2ddca6183d
Parents: 887cf3a c5db277
Author: Tyler Akidau <ta...@apache.org>
Authored: Thu Jun 22 13:19:12 2017 -0700
Committer: Tyler Akidau <ta...@apache.org>
Committed: Thu Jun 22 13:19:12 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/dsls/sql/BeamSql.java | 57 +++++++++++---------
.../org/apache/beam/dsls/sql/BeamSqlCli.java | 18 +++----
.../org/apache/beam/dsls/sql/BeamSqlEnv.java | 12 ++---
.../beam/dsls/sql/example/BeamSqlExample.java | 4 +-
.../math/BeamSqlMathBinaryExpression.java | 1 -
.../beam/dsls/sql/planner/BeamQueryPlanner.java | 7 +--
.../beam/dsls/sql/rel/BeamAggregationRel.java | 7 +--
.../apache/beam/dsls/sql/rel/BeamFilterRel.java | 8 +--
.../apache/beam/dsls/sql/rel/BeamIOSinkRel.java | 9 ++--
.../beam/dsls/sql/rel/BeamIOSourceRel.java | 7 ++-
.../beam/dsls/sql/rel/BeamIntersectRel.java | 8 +--
.../apache/beam/dsls/sql/rel/BeamMinusRel.java | 8 +--
.../beam/dsls/sql/rel/BeamProjectRel.java | 7 +--
.../apache/beam/dsls/sql/rel/BeamRelNode.java | 7 +--
.../dsls/sql/rel/BeamSetOperatorRelBase.java | 10 ++--
.../apache/beam/dsls/sql/rel/BeamSortRel.java | 7 +--
.../apache/beam/dsls/sql/rel/BeamUnionRel.java | 8 +--
.../apache/beam/dsls/sql/rel/BeamValuesRel.java | 6 +--
.../beam/dsls/sql/utils/CalciteUtils.java | 1 -
.../beam/dsls/sql/rel/BeamIntersectRelTest.java | 10 ++--
.../beam/dsls/sql/rel/BeamMinusRelTest.java | 10 ++--
.../sql/rel/BeamSetOperatorRelBaseTest.java | 8 +--
.../beam/dsls/sql/rel/BeamSortRelTest.java | 26 ++++-----
.../beam/dsls/sql/rel/BeamUnionRelTest.java | 8 +--
.../beam/dsls/sql/rel/BeamValuesRelTest.java | 12 +++--
25 files changed, 141 insertions(+), 125 deletions(-)
----------------------------------------------------------------------