You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/06/05 23:34:52 UTC
[1/3] beam git commit: DSL interface for Beam SQL
Repository: beam
Updated Branches:
refs/heads/DSL_SQL dedabff1f -> 9395fbb3c
http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java
index d4e1db2..4795b2c 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java
@@ -18,7 +18,8 @@
package org.apache.beam.dsls.sql.rel;
-import org.apache.beam.dsls.sql.BeamSQLEnvironment;
+import org.apache.beam.dsls.sql.BeamSqlCli;
+import org.apache.beam.dsls.sql.BeamSqlEnv;
import org.apache.beam.dsls.sql.planner.MockedBeamSQLTable;
import org.apache.beam.dsls.sql.schema.BeamSQLRow;
import org.apache.beam.sdk.testing.PAssert;
@@ -36,7 +37,6 @@ import org.junit.Test;
public class BeamValuesRelTest {
@Rule
public final TestPipeline pipeline = TestPipeline.create();
- public static BeamSQLEnvironment runner = BeamSQLEnvironment.create();
private static MockedBeamSQLTable stringTable = MockedBeamSQLTable
.of(SqlTypeName.VARCHAR, "name",
SqlTypeName.VARCHAR, "description");
@@ -49,7 +49,7 @@ public class BeamValuesRelTest {
public void testValues() throws Exception {
String sql = "insert into string_table(name, description) values "
+ "('hello', 'world'), ('james', 'bond')";
- PCollection<BeamSQLRow> rows = runner.compileBeamPipeline(sql, pipeline);
+ PCollection<BeamSQLRow> rows = BeamSqlCli.compilePipeline(sql, pipeline);
PAssert.that(rows).containsInAnyOrder(MockedBeamSQLTable.of(
SqlTypeName.VARCHAR, "name",
SqlTypeName.VARCHAR, "description",
@@ -61,7 +61,7 @@ public class BeamValuesRelTest {
@Test
public void testValues_castInt() throws Exception {
String sql = "insert into int_table (c0, c1) values(cast(1 as int), cast(2 as int))";
- PCollection<BeamSQLRow> rows = runner.compileBeamPipeline(sql, pipeline);
+ PCollection<BeamSQLRow> rows = BeamSqlCli.compilePipeline(sql, pipeline);
PAssert.that(rows).containsInAnyOrder(MockedBeamSQLTable.of(
SqlTypeName.INTEGER, "c0",
SqlTypeName.INTEGER, "c1",
@@ -73,7 +73,7 @@ public class BeamValuesRelTest {
@Test
public void testValues_onlySelect() throws Exception {
String sql = "select 1, '1'";
- PCollection<BeamSQLRow> rows = runner.compileBeamPipeline(sql, pipeline);
+ PCollection<BeamSQLRow> rows = BeamSqlCli.compilePipeline(sql, pipeline);
PAssert.that(rows).containsInAnyOrder(MockedBeamSQLTable.of(
SqlTypeName.INTEGER, "EXPR$0",
SqlTypeName.CHAR, "EXPR$1",
@@ -84,8 +84,8 @@ public class BeamValuesRelTest {
@BeforeClass
public static void prepareClass() {
- runner.addTableMetadata("string_table", stringTable);
- runner.addTableMetadata("int_table", intTable);
+ BeamSqlEnv.registerTable("string_table", stringTable);
+ BeamSqlEnv.registerTable("int_table", intTable);
}
@Before
http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTableTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTableTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTableTest.java
index 6f24e2a..cb268bf 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTableTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTableTest.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.dsls.sql.schema;
+import org.apache.beam.dsls.sql.BeamSqlCli;
+import org.apache.beam.dsls.sql.BeamSqlEnv;
import org.apache.beam.dsls.sql.planner.BasePlanner;
import org.apache.beam.dsls.sql.planner.BeamQueryPlanner;
import org.apache.beam.sdk.testing.TestPipeline;
@@ -27,17 +29,19 @@ import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelProtoDataType;
import org.apache.calcite.sql.type.SqlTypeName;
-import org.junit.BeforeClass;
+import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
/**
* Test case for BeamPCollectionTable.
*/
public class BeamPCollectionTableTest extends BasePlanner{
- public static TestPipeline pipeline = TestPipeline.create();
+ @Rule
+ public final TestPipeline pipeline = TestPipeline.create();
- @BeforeClass
- public static void prepareTable(){
+ @Before
+ public void prepareTable(){
RelProtoDataType protoRowType = new RelProtoDataType() {
@Override
public RelDataType apply(RelDataTypeFactory a0) {
@@ -51,14 +55,16 @@ public class BeamPCollectionTableTest extends BasePlanner{
row.addField(0, 1);
row.addField(1, "hello world.");
PCollection<BeamSQLRow> inputStream = PBegin.in(pipeline).apply(Create.of(row));
- runner.addTableMetadata("COLLECTION_TABLE",
+ BeamSqlEnv.registerTable("COLLECTION_TABLE",
new BeamPCollectionTable(inputStream, protoRowType));
}
@Test
public void testSelectFromPCollectionTable() throws Exception{
String sql = "select c1, c2 from COLLECTION_TABLE";
- runner.executionPlan(sql);
+ PCollection<BeamSQLRow> outputStream = BeamSqlCli.compilePipeline(sql, pipeline);
+
+ pipeline.run().waitUntilFinish();
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java
index bc6343b..985b667 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java
@@ -74,7 +74,7 @@ public class BeamSqlRowCoderTest {
row.addField("col_timestamp", new Date());
- BeamSqlRowCoder coder = BeamSqlRowCoder.of();
+ BeamSqlRowCoder coder = new BeamSqlRowCoder(beamSQLRecordType);
CoderProperties.coderDecodeEncodeEqual(coder, row);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java
index f174b9c..dadd53b 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java
@@ -23,11 +23,11 @@ import java.util.Arrays;
import java.util.List;
import org.apache.beam.dsls.sql.planner.BeamQueryPlanner;
import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
-import org.apache.beam.dsls.sql.schema.BeamSQLRecordTypeCoder;
import org.apache.beam.dsls.sql.schema.BeamSQLRow;
import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
import org.apache.beam.dsls.sql.transform.BeamAggregationTransforms;
-import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Combine;
@@ -38,7 +38,6 @@ import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory.FieldInfoBuilder;
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.sql.SqlKind;
@@ -62,8 +61,15 @@ public class BeamAggregationTransformTest extends BeamTransformBaseTest{
public TestPipeline p = TestPipeline.create();
private List<AggregateCall> aggCalls;
- private BeamSQLRecordType keyType = initTypeOfSqlRow(
- Arrays.asList(KV.of("f_int", SqlTypeName.INTEGER)));
+
+ private BeamSQLRecordType keyType;
+ private BeamSQLRecordType aggPartType;
+ private BeamSQLRecordType outputType;
+
+ private BeamSqlRowCoder inRecordCoder;
+ private BeamSqlRowCoder keyCoder;
+ private BeamSqlRowCoder aggCoder;
+ private BeamSqlRowCoder outRecordCoder;
/**
* This step equals to below query.
@@ -97,21 +103,25 @@ public class BeamAggregationTransformTest extends BeamTransformBaseTest{
//1. extract fields in group-by key part
PCollection<KV<BeamSQLRow, BeamSQLRow>> exGroupByStream = input.apply("exGroupBy",
WithKeys
- .of(new BeamAggregationTransforms.AggregationGroupByKeyFn(-1, ImmutableBitSet.of(0))));
+ .of(new BeamAggregationTransforms.AggregationGroupByKeyFn(-1, ImmutableBitSet.of(0))))
+ .setCoder(KvCoder.<BeamSQLRow, BeamSQLRow>of(keyCoder, inRecordCoder));
//2. apply a GroupByKey.
PCollection<KV<BeamSQLRow, Iterable<BeamSQLRow>>> groupedStream = exGroupByStream
- .apply("groupBy", GroupByKey.<BeamSQLRow, BeamSQLRow>create());
+ .apply("groupBy", GroupByKey.<BeamSQLRow, BeamSQLRow>create())
+ .setCoder(KvCoder.<BeamSQLRow, Iterable<BeamSQLRow>>of(keyCoder,
+ IterableCoder.<BeamSQLRow>of(inRecordCoder)));
//3. run aggregation functions
PCollection<KV<BeamSQLRow, BeamSQLRow>> aggregatedStream = groupedStream.apply("aggregation",
Combine.<BeamSQLRow, BeamSQLRow, BeamSQLRow>groupedValues(
- new BeamAggregationTransforms.AggregationCombineFn(aggCalls, inputRowType)));
+ new BeamAggregationTransforms.AggregationCombineFn(aggCalls, inputRowType)))
+ .setCoder(KvCoder.<BeamSQLRow, BeamSQLRow>of(keyCoder, aggCoder));
//4. flat KV to a single record
PCollection<BeamSQLRow> mergedStream = aggregatedStream.apply("mergeRecord",
- ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord(
- BeamSQLRecordType.from(prepareFinalRowType()), aggCalls)));
+ ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord(outputType, aggCalls)));
+ mergedStream.setCoder(outRecordCoder);
//assert function BeamAggregationTransform.AggregationGroupByKeyFn
PAssert.that(exGroupByStream).containsInAnyOrder(prepareResultOfAggregationGroupByKeyFn());
@@ -126,17 +136,8 @@ public class BeamAggregationTransformTest extends BeamTransformBaseTest{
}
private void setupEnvironment() {
- regiesterCoder();
prepareAggregationCalls();
- }
-
- /**
- * Add Coders in BeamSQL.
- */
- private void regiesterCoder() {
- CoderRegistry cr = p.getCoderRegistry();
- cr.registerCoder(BeamSQLRow.class, BeamSqlRowCoder.of());
- cr.registerCoder(BeamSQLRecordType.class, BeamSQLRecordTypeCoder.of());
+ prepareTypeAndCoder();
}
/**
@@ -327,26 +328,15 @@ public class BeamAggregationTransformTest extends BeamTransformBaseTest{
}
/**
- * expected results after {@link BeamAggregationTransforms.AggregationGroupByKeyFn}.
+ * Coders used in aggregation steps.
*/
- private List<KV<BeamSQLRow, BeamSQLRow>> prepareResultOfAggregationGroupByKeyFn() {
- return Arrays.asList(
- KV.of(new BeamSQLRow(keyType, Arrays.<Object>asList(inputRows.get(0).getInteger(0))),
- inputRows.get(0)),
- KV.of(new BeamSQLRow(keyType, Arrays.<Object>asList(inputRows.get(1).getInteger(0))),
- inputRows.get(1)),
- KV.of(new BeamSQLRow(keyType, Arrays.<Object>asList(inputRows.get(2).getInteger(0))),
- inputRows.get(2)),
- KV.of(new BeamSQLRow(keyType, Arrays.<Object>asList(inputRows.get(3).getInteger(0))),
- inputRows.get(3)));
- }
+ private void prepareTypeAndCoder() {
+ inRecordCoder = new BeamSqlRowCoder(inputRowType);
- /**
- * expected results after {@link BeamAggregationTransforms.AggregationCombineFn}.
- */
- private List<KV<BeamSQLRow, BeamSQLRow>> prepareResultOfAggregationCombineFn()
- throws ParseException {
- BeamSQLRecordType aggPartType = initTypeOfSqlRow(
+ keyType = initTypeOfSqlRow(Arrays.asList(KV.of("f_int", SqlTypeName.INTEGER)));
+ keyCoder = new BeamSqlRowCoder(keyType);
+
+ aggPartType = initTypeOfSqlRow(
Arrays.asList(KV.of("count", SqlTypeName.BIGINT),
KV.of("sum1", SqlTypeName.BIGINT), KV.of("avg1", SqlTypeName.BIGINT),
@@ -369,6 +359,32 @@ public class BeamAggregationTransformTest extends BeamTransformBaseTest{
KV.of("sum8", SqlTypeName.INTEGER), KV.of("avg8", SqlTypeName.INTEGER),
KV.of("max8", SqlTypeName.INTEGER), KV.of("min8", SqlTypeName.INTEGER)
));
+ aggCoder = new BeamSqlRowCoder(aggPartType);
+
+ outputType = prepareFinalRowType();
+ outRecordCoder = new BeamSqlRowCoder(outputType);
+ }
+
+ /**
+ * expected results after {@link BeamAggregationTransforms.AggregationGroupByKeyFn}.
+ */
+ private List<KV<BeamSQLRow, BeamSQLRow>> prepareResultOfAggregationGroupByKeyFn() {
+ return Arrays.asList(
+ KV.of(new BeamSQLRow(keyType, Arrays.<Object>asList(inputRows.get(0).getInteger(0))),
+ inputRows.get(0)),
+ KV.of(new BeamSQLRow(keyType, Arrays.<Object>asList(inputRows.get(1).getInteger(0))),
+ inputRows.get(1)),
+ KV.of(new BeamSQLRow(keyType, Arrays.<Object>asList(inputRows.get(2).getInteger(0))),
+ inputRows.get(2)),
+ KV.of(new BeamSQLRow(keyType, Arrays.<Object>asList(inputRows.get(3).getInteger(0))),
+ inputRows.get(3)));
+ }
+
+ /**
+ * expected results after {@link BeamAggregationTransforms.AggregationCombineFn}.
+ */
+ private List<KV<BeamSQLRow, BeamSQLRow>> prepareResultOfAggregationCombineFn()
+ throws ParseException {
return Arrays.asList(
KV.of(new BeamSQLRow(keyType, Arrays.<Object>asList(inputRows.get(0).getInteger(0))),
new BeamSQLRow(aggPartType, Arrays.<Object>asList(
@@ -387,7 +403,7 @@ public class BeamAggregationTransformTest extends BeamTransformBaseTest{
/**
* Row type of final output row.
*/
- private RelDataType prepareFinalRowType() {
+ private BeamSQLRecordType prepareFinalRowType() {
FieldInfoBuilder builder = BeamQueryPlanner.TYPE_FACTORY.builder();
List<KV<String, SqlTypeName>> columnMetadata =
Arrays.asList(KV.of("f_int", SqlTypeName.INTEGER), KV.of("count", SqlTypeName.BIGINT),
@@ -415,14 +431,14 @@ public class BeamAggregationTransformTest extends BeamTransformBaseTest{
for (KV<String, SqlTypeName> cm : columnMetadata) {
builder.add(cm.getKey(), cm.getValue());
}
- return builder.build();
+ return BeamSQLRecordType.from(builder.build());
}
/**
* expected results after {@link BeamAggregationTransforms.MergeAggregationRecord}.
*/
private BeamSQLRow prepareResultOfMergeAggregationRecord() throws ParseException {
- return new BeamSQLRow(BeamSQLRecordType.from(prepareFinalRowType()), Arrays.<Object>asList(
+ return new BeamSQLRow(outputType, Arrays.<Object>asList(
1, 4L,
10000L, 2500L, 4000L, 1000L,
(short) 10, (short) 2, (short) 4, (short) 1,
[3/3] beam git commit: [BEAM-2010] expose programming interface
Posted by lc...@apache.org.
[BEAM-2010] expose programming interface
This closes #3250
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9395fbb3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9395fbb3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9395fbb3
Branch: refs/heads/DSL_SQL
Commit: 9395fbb3c8d747b53244fc54a582acfd3e106d76
Parents: dedabff 680a543
Author: Luke Cwik <lc...@google.com>
Authored: Mon Jun 5 16:34:41 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Mon Jun 5 16:34:41 2017 -0700
----------------------------------------------------------------------
.../beam/dsls/sql/BeamSQLEnvironment.java | 142 ----------------
.../java/org/apache/beam/dsls/sql/BeamSql.java | 166 +++++++++++++++++++
.../org/apache/beam/dsls/sql/BeamSqlCli.java | 70 ++++++++
.../org/apache/beam/dsls/sql/BeamSqlEnv.java | 63 +++++++
.../beam/dsls/sql/example/BeamSqlExample.java | 106 +++++-------
.../dsls/sql/planner/BeamPipelineCreator.java | 17 +-
.../beam/dsls/sql/planner/BeamQueryPlanner.java | 21 ++-
.../beam/dsls/sql/rel/BeamAggregationRel.java | 57 +++++--
.../apache/beam/dsls/sql/rel/BeamFilterRel.java | 12 +-
.../apache/beam/dsls/sql/rel/BeamIOSinkRel.java | 17 +-
.../beam/dsls/sql/rel/BeamIOSourceRel.java | 25 ++-
.../beam/dsls/sql/rel/BeamProjectRel.java | 11 +-
.../apache/beam/dsls/sql/rel/BeamRelNode.java | 11 +-
.../apache/beam/dsls/sql/rel/BeamSortRel.java | 19 ++-
.../apache/beam/dsls/sql/rel/BeamValuesRel.java | 10 +-
.../beam/dsls/sql/schema/BeamSQLRecordType.java | 22 +++
.../dsls/sql/schema/BeamSQLRecordTypeCoder.java | 87 ----------
.../beam/dsls/sql/schema/BeamSqlRowCoder.java | 34 ++--
.../beam/dsls/sql/planner/BasePlanner.java | 10 +-
.../sql/planner/BeamGroupByExplainTest.java | 18 +-
.../sql/planner/BeamGroupByPipelineTest.java | 18 +-
.../sql/planner/BeamInvalidGroupByTest.java | 5 +-
.../BeamPlannerAggregationSubmitTest.java | 12 +-
.../sql/planner/BeamPlannerExplainTest.java | 7 +-
.../dsls/sql/planner/BeamPlannerSubmitTest.java | 3 +-
.../beam/dsls/sql/rel/BeamSortRelTest.java | 28 ++--
.../beam/dsls/sql/rel/BeamValuesRelTest.java | 14 +-
.../sql/schema/BeamPCollectionTableTest.java | 18 +-
.../dsls/sql/schema/BeamSqlRowCoderTest.java | 2 +-
.../transform/BeamAggregationTransformTest.java | 98 ++++++-----
30 files changed, 628 insertions(+), 495 deletions(-)
----------------------------------------------------------------------
[2/3] beam git commit: DSL interface for Beam SQL
Posted by lc...@apache.org.
DSL interface for Beam SQL
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/680a543d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/680a543d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/680a543d
Branch: refs/heads/DSL_SQL
Commit: 680a543d20d0d8bca25aced0a5f02c38529babf2
Parents: dedabff
Author: mingmxu <mi...@ebay.com>
Authored: Fri May 26 22:07:03 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Mon Jun 5 16:22:08 2017 -0700
----------------------------------------------------------------------
.../beam/dsls/sql/BeamSQLEnvironment.java | 142 ----------------
.../java/org/apache/beam/dsls/sql/BeamSql.java | 166 +++++++++++++++++++
.../org/apache/beam/dsls/sql/BeamSqlCli.java | 70 ++++++++
.../org/apache/beam/dsls/sql/BeamSqlEnv.java | 63 +++++++
.../beam/dsls/sql/example/BeamSqlExample.java | 106 +++++-------
.../dsls/sql/planner/BeamPipelineCreator.java | 17 +-
.../beam/dsls/sql/planner/BeamQueryPlanner.java | 21 ++-
.../beam/dsls/sql/rel/BeamAggregationRel.java | 57 +++++--
.../apache/beam/dsls/sql/rel/BeamFilterRel.java | 12 +-
.../apache/beam/dsls/sql/rel/BeamIOSinkRel.java | 17 +-
.../beam/dsls/sql/rel/BeamIOSourceRel.java | 25 ++-
.../beam/dsls/sql/rel/BeamProjectRel.java | 11 +-
.../apache/beam/dsls/sql/rel/BeamRelNode.java | 11 +-
.../apache/beam/dsls/sql/rel/BeamSortRel.java | 19 ++-
.../apache/beam/dsls/sql/rel/BeamValuesRel.java | 10 +-
.../beam/dsls/sql/schema/BeamSQLRecordType.java | 22 +++
.../dsls/sql/schema/BeamSQLRecordTypeCoder.java | 87 ----------
.../beam/dsls/sql/schema/BeamSqlRowCoder.java | 34 ++--
.../beam/dsls/sql/planner/BasePlanner.java | 10 +-
.../sql/planner/BeamGroupByExplainTest.java | 18 +-
.../sql/planner/BeamGroupByPipelineTest.java | 18 +-
.../sql/planner/BeamInvalidGroupByTest.java | 5 +-
.../BeamPlannerAggregationSubmitTest.java | 12 +-
.../sql/planner/BeamPlannerExplainTest.java | 7 +-
.../dsls/sql/planner/BeamPlannerSubmitTest.java | 3 +-
.../beam/dsls/sql/rel/BeamSortRelTest.java | 28 ++--
.../beam/dsls/sql/rel/BeamValuesRelTest.java | 14 +-
.../sql/schema/BeamPCollectionTableTest.java | 18 +-
.../dsls/sql/schema/BeamSqlRowCoderTest.java | 2 +-
.../transform/BeamAggregationTransformTest.java | 98 ++++++-----
30 files changed, 628 insertions(+), 495 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSQLEnvironment.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSQLEnvironment.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSQLEnvironment.java
deleted file mode 100644
index cdb25f5..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSQLEnvironment.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql;
-
-import java.io.Serializable;
-import org.apache.beam.dsls.sql.planner.BeamQueryPlanner;
-import org.apache.beam.dsls.sql.rel.BeamRelNode;
-import org.apache.beam.dsls.sql.schema.BaseBeamTable;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.calcite.plan.RelOptUtil;
-import org.apache.calcite.schema.Schema;
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.schema.impl.ScalarFunctionImpl;
-import org.apache.calcite.sql.parser.SqlParseException;
-import org.apache.calcite.tools.Frameworks;
-import org.apache.calcite.tools.RelConversionException;
-import org.apache.calcite.tools.ValidationException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * {@code BeamSQLEnvironment} is the integrated environment of BeamSQL.
- * It provides runtime context to execute SQL queries as Beam pipeline,
- * including table metadata, SQL engine and a Beam pipeline translator.
- *
- * <h1>1. BeamSQL as DSL</h1>
- * <em>BeamSQL as DSL</em> enables developers to embed SQL queries when writing a Beam pipeline.
- * A typical pipeline with BeamSQL DSL is:
- * <pre>
- *{@code
-PipelineOptions options = PipelineOptionsFactory...
-Pipeline pipeline = Pipeline.create(options);
-
-//prepare environment of BeamSQL
-BeamSQLEnvironment sqlEnv = BeamSQLEnvironment.create();
-//register table metadata
-sqlEnv.addTableMetadata(String tableName, BeamSqlTable tableMetadata);
-//register UDF
-sqlEnv.registerUDF(String functionName, Method udfMethod);
-
-
-//explain a SQL statement, SELECT only, and return as a PCollection;
-PCollection<BeamSQLRow> phase1Stream = sqlEnv.explainSQL(pipeline, String sqlStatement);
-//A PCollection explained by BeamSQL can be converted into a table, and apply queries on it;
-sqlEnv.registerPCollectionAsTable(String tableName, phase1Stream);
-
-//apply more queries, even based on phase1Stream
-
-pipeline.run().waitUntilFinish();
- * }
- * </pre>
- *
- * <h1>2. BeamSQL as CLI</h1>
- * This feature is on planning, and not ready yet.
- *
- */
-public class BeamSQLEnvironment implements Serializable {
- private static final Logger LOG = LoggerFactory.getLogger(BeamSQLEnvironment.class);
-
- public static final BeamSQLEnvironment INSTANCE = new BeamSQLEnvironment();
-
- private SchemaPlus schema = Frameworks.createRootSchema(true);
- private BeamQueryPlanner planner = new BeamQueryPlanner(schema);
-
- private BeamSQLEnvironment() {
- //disable assertions in Calcite.
- ClassLoader.getSystemClassLoader().setDefaultAssertionStatus(false);
- }
-
- /**
- * Return an instance of {@code BeamSQLEnvironment}.
- */
- public static BeamSQLEnvironment create(){
- return INSTANCE;
- }
-
- /**
- * Add a schema.
- *
- */
- public void addSchema(String schemaName, Schema scheme) {
- schema.add(schemaName, schema);
- }
-
- /**
- * add a {@link BaseBeamTable} to schema repository.
- */
- public void addTableMetadata(String tableName, BaseBeamTable tableMetadata) {
- schema.add(tableName, tableMetadata);
- planner.getSourceTables().put(tableName, tableMetadata);
- }
-
- /* Add a UDF function.
- *
- * <p>There're two requirements for function {@code methodName}:<br>
- * 1. It must be a STATIC method;<br>
- * 2. For a primitive parameter, use its wrapper class and handle NULL properly;
- */
- public void addUDFFunction(String functionName, Class<?> className, String methodName){
- schema.add(functionName, ScalarFunctionImpl.create(className, methodName));
- }
-
- /**
- * explain and display the execution plan.
- */
- public String executionPlan(String sqlString)
- throws ValidationException, RelConversionException, SqlParseException {
- BeamRelNode exeTree = planner.convertToBeamRel(sqlString);
- String beamPlan = RelOptUtil.toString(exeTree);
- LOG.info(String.format("beamPlan>\n%s", beamPlan));
- return beamPlan;
- }
-
- /**
- * {@code compileBeamPipeline} translate a SQL statement to executed as Beam data flow,
- * which is linked with the given {@code pipeline}. The final output stream is returned as
- * {@code PCollection} so more operations can be applied.
- */
- public PCollection<BeamSQLRow> compileBeamPipeline(String sqlStatement, Pipeline basePipeline)
- throws Exception{
- PCollection<BeamSQLRow> resultStream = planner.compileBeamPipeline(sqlStatement, basePipeline);
- return resultStream;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
new file mode 100644
index 0000000..8c2c5ad
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql;
+
+import org.apache.beam.dsls.sql.exception.BeamSqlUnsupportedException;
+import org.apache.beam.dsls.sql.rel.BeamRelNode;
+import org.apache.beam.dsls.sql.schema.BeamPCollectionTable;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlSelect;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.tools.RelConversionException;
+import org.apache.calcite.tools.ValidationException;
+
+/**
+ * {@code BeamSql} is the DSL interface of BeamSQL. It translates a SQL query as a
+ * {@link PTransform}, so developers can use standard SQL queries in a Beam pipeline.
+ *
+ * <h1>Beam SQL DSL usage:</h1>
+ * A typical pipeline with Beam SQL DSL is:
+ * <pre>
+ *{@code
+PipelineOptions options = PipelineOptionsFactory.create();
+Pipeline p = Pipeline.create(options);
+
+//create table from TextIO;
+TableSchema tableASchema = ...;
+PCollection<BeamSqlRow> inputTableA = p.apply(TextIO.read().from("/my/input/patha"))
+ .apply(BeamSql.fromTextRow(tableASchema));
+TableSchema tableBSchema = ...;
+PCollection<BeamSqlRow> inputTableB = p.apply(TextIO.read().from("/my/input/pathb"))
+ .apply(BeamSql.fromTextRow(tableBSchema));
+
+//run a simple query, and register the output as a table in BeamSql;
+String sql1 = "select MY_FUNC(c1), c2 from TABLE_A";
+PCollection<BeamSqlRow> outputTableA = inputTableA.apply(BeamSql.simpleQuery(sql1))
+ .withUdf("MY_FUNC", myFunc);
+
+//run a JOIN with one table from TextIO, and one table from another query
+PCollection<BeamSqlRow> outputTableB = PCollectionTuple.of(
+ new TupleTag<BeamSqlRow>("TABLE_O_A"), outputTableA)
+ .and(new TupleTag<BeamSqlRow>("TABLE_B"), inputTableB)
+ .apply(BeamSql.query("select * from TABLE_O_A JOIN TABLE_B where ..."));
+
+//output the final result with TextIO
+outputTableB.apply(BeamSql.toTextRow()).apply(TextIO.write().to("/my/output/path"));
+
+p.run().waitUntilFinish();
+ * }
+ * </pre>
+ */
+@Experimental
+public class BeamSql {
+ /**
+ * Transforms a SQL query into a {@link PTransform} representing an equivalent execution plan.
+ *
+ * <p>The returned {@link PTransform} can be applied to a {@link PCollectionTuple} representing
+ * all the input tables and results in a {@code PCollection<BeamSQLRow>} representing the output
+ * table. The {@link PCollectionTuple} contains the mapping from {@code table names} to
+ * {@code PCollection<BeamSQLRow>}, each representing an input table.
+ *
+ * <p>It is an error to apply a {@link PCollectionTuple} missing any {@code table names}
+ * referenced within the query.
+ */
+ public static PTransform<PCollectionTuple, PCollection<BeamSQLRow>> query(String sqlQuery) {
+ return new QueryTransform(sqlQuery);
+
+ }
+
+ /**
+ * Transforms a SQL query into a {@link PTransform} representing an equivalent execution plan.
+ *
+ * <p>This is a simplified form of {@link #query(String)} where the query must reference
+ * a single input table.
+ */
+ public static PTransform<PCollection<BeamSQLRow>, PCollection<BeamSQLRow>>
+ simpleQuery(String sqlQuery) throws Exception {
+ return new SimpleQueryTransform(sqlQuery);
+ }
+
+ /**
+ * A {@link PTransform} representing an execution plan for a SQL query.
+ */
+ public static class QueryTransform extends PTransform<PCollectionTuple, PCollection<BeamSQLRow>> {
+ private String sqlQuery;
+ public QueryTransform(String sqlQuery) {
+ this.sqlQuery = sqlQuery;
+ }
+
+ @Override
+ public PCollection<BeamSQLRow> expand(PCollectionTuple input) {
+ BeamRelNode beamRelNode = null;
+ try {
+ beamRelNode = BeamSqlEnv.planner.convertToBeamRel(sqlQuery);
+ } catch (ValidationException | RelConversionException | SqlParseException e) {
+ throw new IllegalStateException(e);
+ }
+
+ try {
+ return beamRelNode.buildBeamPipeline(input);
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ }
+
+ /**
+ * A {@link PTransform} representing an execution plan for a SQL query referencing
+ * a single table.
+ */
+ public static class SimpleQueryTransform
+ extends PTransform<PCollection<BeamSQLRow>, PCollection<BeamSQLRow>> {
+ private String sqlQuery;
+ public SimpleQueryTransform(String sqlQuery) {
+ this.sqlQuery = sqlQuery;
+ }
+
+ public SimpleQueryTransform withUdf(String udfName){
+ throw new BeamSqlUnsupportedException("Pending for UDF support");
+ }
+
+ @Override
+ public PCollection<BeamSQLRow> expand(PCollection<BeamSQLRow> input) {
+ SqlNode sqlNode;
+ try {
+ sqlNode = BeamSqlEnv.planner.parseQuery(sqlQuery);
+ BeamSqlEnv.planner.getPlanner().close();
+ } catch (SqlParseException e) {
+ throw new IllegalStateException(e);
+ }
+ BeamSqlRowCoder inputCoder = (BeamSqlRowCoder) input.getCoder();
+
+ if (sqlNode instanceof SqlSelect) {
+ SqlSelect select = (SqlSelect) sqlNode;
+ String tableName = select.getFrom().toString();
+ BeamSqlEnv.registerTable(tableName,
+ new BeamPCollectionTable(input, inputCoder.getTableSchema().toRelDataType()));
+ return PCollectionTuple.of(new TupleTag<BeamSQLRow>(tableName), input)
+ .apply(BeamSql.query(sqlQuery));
+ } else {
+ throw new BeamSqlUnsupportedException(sqlNode.toString());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java
new file mode 100644
index 0000000..6591589
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql;
+
+import org.apache.beam.dsls.sql.rel.BeamRelNode;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.tools.RelConversionException;
+import org.apache.calcite.tools.ValidationException;
+
+/**
+ * {@link BeamSqlCli} provides methods to execute Beam SQL with an interactive client.
+ */
+@Experimental
+public class BeamSqlCli {
+
+ /**
+ * Returns a human readable representation of the query execution plan.
+ */
+ public static String explainQuery(String sqlString)
+ throws ValidationException, RelConversionException, SqlParseException {
+ BeamRelNode exeTree = BeamSqlEnv.planner.convertToBeamRel(sqlString);
+ String beamPlan = RelOptUtil.toString(exeTree);
+ return beamPlan;
+ }
+
+ /**
+ * compile SQL, and return a {@link Pipeline}.
+ */
+ public static PCollection<BeamSQLRow> compilePipeline(String sqlStatement) throws Exception{
+ PipelineOptions options = PipelineOptionsFactory.fromArgs(new String[] {}).withValidation()
+ .as(PipelineOptions.class); // FlinkPipelineOptions.class
+ options.setJobName("BeamPlanCreator");
+ Pipeline pipeline = Pipeline.create(options);
+
+ return compilePipeline(sqlStatement, pipeline);
+ }
+
+ /**
+ * compile SQL, and return a {@link Pipeline}.
+ */
+ public static PCollection<BeamSQLRow> compilePipeline(String sqlStatement, Pipeline basePipeline)
+ throws Exception{
+ PCollection<BeamSQLRow> resultStream =
+ BeamSqlEnv.planner.compileBeamPipeline(sqlStatement, basePipeline);
+ return resultStream;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java
new file mode 100644
index 0000000..af6c007
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql;
+
+import org.apache.beam.dsls.sql.planner.BeamQueryPlanner;
+import org.apache.beam.dsls.sql.schema.BaseBeamTable;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.impl.ScalarFunctionImpl;
+import org.apache.calcite.tools.Frameworks;
+
+/**
+ * {@link BeamSqlEnv} prepares the execution context for {@link BeamSql} and {@link BeamSqlCli}.
+ *
+ * <p>It contains a {@link SchemaPlus} which holds the metadata of tables/UDF functions, and
+ * a {@link BeamQueryPlanner} which parse/validate/optimize/translate input SQL queries.
+ */
+public class BeamSqlEnv {
+ public static SchemaPlus schema;
+ public static BeamQueryPlanner planner;
+
+ static {
+ schema = Frameworks.createRootSchema(true);
+ planner = new BeamQueryPlanner(schema);
+ }
+
+ /**
+ * Register a UDF function which can be used in SQL expression.
+ */
+ public static void registerUdf(String functionName, Class<?> clazz, String methodName) {
+ schema.add(functionName, ScalarFunctionImpl.create(clazz, methodName));
+ }
+
+ /**
+ * Registers a {@link BaseBeamTable} which can be used for all subsequent queries.
+ *
+ */
+ public static void registerTable(String tableName, BaseBeamTable table) {
+ schema.add(tableName, table);
+ planner.getSourceTables().put(tableName, table);
+ }
+
+ /**
+ * Find {@link BaseBeamTable} by table name.
+ */
+ public static BaseBeamTable findTable(String tableName){
+ return planner.getSourceTables().get(tableName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
index 2695944..6a1b81d 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
@@ -17,93 +17,61 @@
*/
package org.apache.beam.dsls.sql.example;
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.beam.dsls.sql.BeamSQLEnvironment;
-import org.apache.beam.dsls.sql.schema.BaseBeamTable;
+import org.apache.beam.dsls.sql.BeamSql;
+import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
import org.apache.beam.dsls.sql.schema.BeamSQLRow;
-import org.apache.beam.dsls.sql.schema.kafka.BeamKafkaCSVTable;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rel.type.RelProtoDataType;
import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * This is one quick example.
+ * This is a quick example, which uses Beam SQL DSL to create a data pipeline.
*
- * <p>Before start, follow https://kafka.apache.org/quickstart to setup a Kafka
- * cluster locally, and run below commands to create required Kafka topics:
- * <pre>
- * <code>
- * bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 \
- * --partitions 1 --topic orders
- * bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 \
- * --partitions 1 --topic sub_orders
- * </code>
- * </pre>
- * After run the application, produce several test records:
- * <pre>
- * <code>
- * bin/kafka-console-producer.sh --broker-list localhost:9092 --topic orders
- * invalid,record
- * 123445,0,100,3413423
- * 234123,3,232,3451231234
- * 234234,0,5,1234123
- * 345234,0,345234.345,3423
- * </code>
- * </pre>
- * Meanwhile, open another console to see the output:
- * <pre>
- * <code>
- * bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sub_orders
- * **Expected :
- * 123445,0,100.0
- * 345234,0,345234.345
- * </code>
- * </pre>
*/
-public class BeamSqlExample implements Serializable {
+public class BeamSqlExample {
+ private static final Logger LOG = LoggerFactory.getLogger(BeamSqlExample.class);
public static void main(String[] args) throws Exception {
- PipelineOptions options = PipelineOptionsFactory.fromArgs(new String[] {}).withValidation()
- .as(PipelineOptions.class); // FlinkPipelineOptions.class
- options.setJobName("BeamSqlExample");
- Pipeline pipeline = Pipeline.create(options);
+ PipelineOptions options = PipelineOptionsFactory.create();
+ Pipeline p = Pipeline.create(options);
- BeamSQLEnvironment runner = BeamSQLEnvironment.create();
- runner.addTableMetadata("ORDER_DETAILS", getTable("127.0.0.1:9092", "orders"));
- runner.addTableMetadata("SUB_ORDER", getTable("127.0.0.1:9092", "sub_orders"));
+ //define the input row format
+ BeamSQLRecordType type = new BeamSQLRecordType();
+ type.addField("c1", SqlTypeName.INTEGER);
+ type.addField("c2", SqlTypeName.VARCHAR);
+ type.addField("c3", SqlTypeName.DOUBLE);
+ BeamSQLRow row = new BeamSQLRow(type);
+ row.addField(0, 1);
+ row.addField(1, "row");
+ row.addField(2, 1.0);
- // case 2: insert into <table>(<fields>) select STREAM <fields> from
- // <table> from <clause>
- String sql = "INSERT INTO SUB_ORDER(order_id, site_id, price) " + "SELECT "
- + " order_id, site_id, price " + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 and price > 20";
+ //create a source PCollection with Create.of();
+ PCollection<BeamSQLRow> inputTable = PBegin.in(p).apply(Create.of(row)
+ .withCoder(new BeamSqlRowCoder(type)));
- PCollection<BeamSQLRow> outputStream = runner.compileBeamPipeline(sql, pipeline);
+ //run a simple SQL query over input PCollection;
+ String sql = "select c2, c3 from TABLE_A where c1=1";
+ PCollection<BeamSQLRow> outputStream = inputTable.apply(BeamSql.simpleQuery(sql));
- pipeline.run().waitUntilFinish();
- }
-
- public static BaseBeamTable getTable(String bootstrapServer, String topic) {
- final RelProtoDataType protoRowType = new RelProtoDataType() {
+ //log out the output record;
+ outputStream.apply("log_result",
+ MapElements.<BeamSQLRow, Void>via(new SimpleFunction<BeamSQLRow, Void>() {
@Override
- public RelDataType apply(RelDataTypeFactory a0) {
- return a0.builder().add("order_id", SqlTypeName.BIGINT).add("site_id", SqlTypeName.INTEGER)
- .add("price", SqlTypeName.DOUBLE).add("order_time", SqlTypeName.TIMESTAMP).build();
+ public Void apply(BeamSQLRow input) {
+ LOG.info(input.valueInString());
+ return null;
}
- };
-
- Map<String, Object> consumerPara = new HashMap<String, Object>();
- consumerPara.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
+ }));
- return new BeamKafkaCSVTable(protoRowType, bootstrapServer, Arrays.asList(topic))
- .updateConsumerProperties(consumerPara);
+ p.run().waitUntilFinish();
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamPipelineCreator.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamPipelineCreator.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamPipelineCreator.java
index 1f3ba58..abdc66c 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamPipelineCreator.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamPipelineCreator.java
@@ -18,16 +18,9 @@
package org.apache.beam.dsls.sql.planner;
import java.util.Map;
-
import org.apache.beam.dsls.sql.rel.BeamRelNode;
import org.apache.beam.dsls.sql.schema.BaseBeamTable;
-import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
-import org.apache.beam.dsls.sql.schema.BeamSQLRecordTypeCoder;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
-import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.options.PipelineOptions;
/**
* {@link BeamPipelineCreator} converts a {@link BeamRelNode} tree, into a Beam
@@ -37,19 +30,13 @@ import org.apache.beam.sdk.options.PipelineOptions;
public class BeamPipelineCreator {
private Map<String, BaseBeamTable> sourceTables;
- private PipelineOptions options;
-
private Pipeline pipeline;
private boolean hasPersistent = false;
- public BeamPipelineCreator(Map<String, BaseBeamTable> sourceTables, Pipeline pipeline) {
+ public BeamPipelineCreator(Map<String, BaseBeamTable> sourceTables, Pipeline basePipeline) {
this.sourceTables = sourceTables;
- this.pipeline = pipeline;
-
- CoderRegistry cr = pipeline.getCoderRegistry();
- cr.registerCoder(BeamSQLRow.class, BeamSqlRowCoder.of());
- cr.registerCoder(BeamSQLRecordType.class, BeamSQLRecordTypeCoder.of());
+ this.pipeline = basePipeline;
}
public Map<String, BaseBeamTable> getSourceTables() {
http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
index 0a7407c..6f148d6 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
@@ -22,13 +22,13 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-
import org.apache.beam.dsls.sql.rel.BeamLogicalConvention;
import org.apache.beam.dsls.sql.rel.BeamRelNode;
import org.apache.beam.dsls.sql.schema.BaseBeamTable;
import org.apache.beam.dsls.sql.schema.BeamSQLRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.config.Lex;
import org.apache.calcite.jdbc.CalciteSchema;
@@ -95,17 +95,24 @@ public class BeamQueryPlanner {
}
/**
+ * Parse input SQL query, and return a {@link SqlNode} as grammar tree.
+ */
+ public SqlNode parseQuery(String sqlQuery) throws SqlParseException{
+ return planner.parse(sqlQuery);
+ }
+
+ /**
* {@code compileBeamPipeline} translate a SQL statement to executed as Beam data flow,
* which is linked with the given {@code pipeline}. The final output stream is returned as
* {@code PCollection} so more operations can be applied.
*/
- public PCollection<BeamSQLRow> compileBeamPipeline(String sqlStatement, Pipeline pipeline)
+ public PCollection<BeamSQLRow> compileBeamPipeline(String sqlStatement, Pipeline basePipeline)
throws Exception {
BeamRelNode relNode = convertToBeamRel(sqlStatement);
- BeamPipelineCreator planCreator = new BeamPipelineCreator(sourceTables, pipeline);
-
- return relNode.buildBeamPipeline(planCreator);
+ BeamPipelineCreator planCreator = new BeamPipelineCreator(sourceTables, basePipeline);
+ // the input PCollectionTuple is empty, and be rebuilt in BeamIOSourceRel.
+ return relNode.buildBeamPipeline(PCollectionTuple.empty(basePipeline));
}
/**
@@ -155,4 +162,8 @@ public class BeamQueryPlanner {
return sourceTables;
}
+ public Planner getPlanner() {
+ return planner;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
index 3e147aa..6914883 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
@@ -18,11 +18,13 @@
package org.apache.beam.dsls.sql.rel;
import java.util.List;
-import org.apache.beam.dsls.sql.planner.BeamPipelineCreator;
import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils;
import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
import org.apache.beam.dsls.sql.transform.BeamAggregationTransforms;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.ParDo;
@@ -34,6 +36,7 @@ import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.calcite.linq4j.Ord;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
@@ -41,6 +44,7 @@ import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelWriter;
import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.calcite.util.Util;
import org.joda.time.Duration;
@@ -67,16 +71,17 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode {
}
@Override
- public PCollection<BeamSQLRow> buildBeamPipeline(BeamPipelineCreator planCreator)
+ public PCollection<BeamSQLRow> buildBeamPipeline(PCollectionTuple inputPCollections)
throws Exception {
RelNode input = getInput();
String stageName = BeamSQLRelUtils.getStageName(this);
PCollection<BeamSQLRow> upstream =
- BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(planCreator);
+ BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections);
if (windowFieldIdx != -1) {
upstream = upstream.apply("assignEventTimestamp", WithTimestamps
- .<BeamSQLRow>of(new BeamAggregationTransforms.WindowTimestampFn(windowFieldIdx)));
+ .<BeamSQLRow>of(new BeamAggregationTransforms.WindowTimestampFn(windowFieldIdx)))
+ .setCoder(upstream.getCoder());
}
PCollection<BeamSQLRow> windowStream = upstream.apply("window",
@@ -85,29 +90,59 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode {
.withAllowedLateness(allowedLatence)
.accumulatingFiredPanes());
- //1. extract fields in group-by key part
+ BeamSqlRowCoder keyCoder = new BeamSqlRowCoder(exKeyFieldsSchema(input.getRowType()));
PCollection<KV<BeamSQLRow, BeamSQLRow>> exGroupByStream = windowStream.apply("exGroupBy",
WithKeys
- .of(new BeamAggregationTransforms.AggregationGroupByKeyFn(windowFieldIdx, groupSet)));
+ .of(new BeamAggregationTransforms.AggregationGroupByKeyFn(
+ windowFieldIdx, groupSet)))
+ .setCoder(KvCoder.<BeamSQLRow, BeamSQLRow>of(keyCoder, upstream.getCoder()));
- //2. apply a GroupByKey.
PCollection<KV<BeamSQLRow, Iterable<BeamSQLRow>>> groupedStream = exGroupByStream
- .apply("groupBy", GroupByKey.<BeamSQLRow, BeamSQLRow>create());
+ .apply("groupBy", GroupByKey.<BeamSQLRow, BeamSQLRow>create())
+ .setCoder(KvCoder.<BeamSQLRow, Iterable<BeamSQLRow>>of(keyCoder,
+ IterableCoder.<BeamSQLRow>of(upstream.getCoder())));
- //3. run aggregation functions
+ BeamSqlRowCoder aggCoder = new BeamSqlRowCoder(exAggFieldsSchema());
PCollection<KV<BeamSQLRow, BeamSQLRow>> aggregatedStream = groupedStream.apply("aggregation",
Combine.<BeamSQLRow, BeamSQLRow, BeamSQLRow>groupedValues(
new BeamAggregationTransforms.AggregationCombineFn(getAggCallList(),
- BeamSQLRecordType.from(input.getRowType()))));
+ BeamSQLRecordType.from(input.getRowType()))))
+ .setCoder(KvCoder.<BeamSQLRow, BeamSQLRow>of(keyCoder, aggCoder));
- //4. flat KV to a single record
PCollection<BeamSQLRow> mergedStream = aggregatedStream.apply("mergeRecord",
ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord(
BeamSQLRecordType.from(getRowType()), getAggCallList())));
+ mergedStream.setCoder(new BeamSqlRowCoder(BeamSQLRecordType.from(getRowType())));
return mergedStream;
}
+ /**
+ * Type of sub-rowrecord used as Group-By keys.
+ */
+ private BeamSQLRecordType exKeyFieldsSchema(RelDataType relDataType) {
+ BeamSQLRecordType inputRecordType = BeamSQLRecordType.from(relDataType);
+ BeamSQLRecordType typeOfKey = new BeamSQLRecordType();
+ for (int i : groupSet.asList()) {
+ if (i != windowFieldIdx) {
+ typeOfKey.addField(inputRecordType.getFieldsName().get(i),
+ inputRecordType.getFieldsType().get(i));
+ }
+ }
+ return typeOfKey;
+ }
+
+ /**
+ * Type of sub-rowrecord, that represents the list of aggregation fields.
+ */
+ private BeamSQLRecordType exAggFieldsSchema() {
+ BeamSQLRecordType typeOfAggFields = new BeamSQLRecordType();
+ for (AggregateCall ac : getAggCallList()) {
+ typeOfAggFields.addField(ac.name, ac.type.getSqlTypeName());
+ }
+ return typeOfAggFields;
+ }
+
@Override
public Aggregate copy(RelTraitSet traitSet, RelNode input, boolean indicator
, ImmutableBitSet groupSet,
http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java
index f2c1bba..3387071 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java
@@ -19,12 +19,14 @@ package org.apache.beam.dsls.sql.rel;
import org.apache.beam.dsls.sql.interpreter.BeamSQLExpressionExecutor;
import org.apache.beam.dsls.sql.interpreter.BeamSQLFnExecutor;
-import org.apache.beam.dsls.sql.planner.BeamPipelineCreator;
import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils;
+import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
import org.apache.beam.dsls.sql.transform.BeamSQLFilterFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
@@ -48,20 +50,20 @@ public class BeamFilterRel extends Filter implements BeamRelNode {
}
@Override
- public PCollection<BeamSQLRow> buildBeamPipeline(BeamPipelineCreator planCreator)
+ public PCollection<BeamSQLRow> buildBeamPipeline(PCollectionTuple inputPCollections)
throws Exception {
RelNode input = getInput();
-
String stageName = BeamSQLRelUtils.getStageName(this);
- PCollection<BeamSQLRow> upstream = BeamSQLRelUtils.getBeamRelInput(input)
- .buildBeamPipeline(planCreator);
+ PCollection<BeamSQLRow> upstream =
+ BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections);
BeamSQLExpressionExecutor executor = new BeamSQLFnExecutor(this);
PCollection<BeamSQLRow> filterStream = upstream.apply(stageName,
ParDo.of(new BeamSQLFilterFn(getRelTypeName(), executor)));
+ filterStream.setCoder(new BeamSqlRowCoder(BeamSQLRecordType.from(getRowType())));
return filterStream;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java
index bc94ab8..f821700 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java
@@ -19,12 +19,13 @@ package org.apache.beam.dsls.sql.rel;
import com.google.common.base.Joiner;
import java.util.List;
-
-import org.apache.beam.dsls.sql.planner.BeamPipelineCreator;
+import org.apache.beam.dsls.sql.BeamSqlEnv;
import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils;
import org.apache.beam.dsls.sql.schema.BaseBeamTable;
import org.apache.beam.dsls.sql.schema.BeamSQLRow;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PDone;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelTraitSet;
@@ -52,22 +53,24 @@ public class BeamIOSinkRel extends TableModify implements BeamRelNode {
}
/**
- * Note that {@code BeamIOSinkRel} returns the input PCollection.
+ * Note that {@code BeamIOSinkRel} returns the input PCollection,
+ * which is the persisted PCollection.
*/
@Override
- public PCollection<BeamSQLRow> buildBeamPipeline(BeamPipelineCreator planCreator)
+ public PCollection<BeamSQLRow> buildBeamPipeline(PCollectionTuple inputPCollections)
throws Exception {
+
RelNode input = getInput();
String stageName = BeamSQLRelUtils.getStageName(this);
PCollection<BeamSQLRow> upstream =
- BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(planCreator);
+ BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections);
String sourceName = Joiner.on('.').join(getTable().getQualifiedName());
- BaseBeamTable targetTable = planCreator.getSourceTables().get(sourceName);
+ BaseBeamTable targetTable = BeamSqlEnv.findTable(sourceName);
- upstream.apply(stageName, targetTable.buildIOWriter());
+ PDone streamEnd = upstream.apply(stageName, targetTable.buildIOWriter());
return upstream;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
index f4d5001..38de41e 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
@@ -18,12 +18,13 @@
package org.apache.beam.dsls.sql.rel;
import com.google.common.base.Joiner;
-
-import org.apache.beam.dsls.sql.planner.BeamPipelineCreator;
+import org.apache.beam.dsls.sql.BeamSqlEnv;
import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils;
import org.apache.beam.dsls.sql.schema.BaseBeamTable;
import org.apache.beam.dsls.sql.schema.BeamSQLRow;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelTraitSet;
@@ -40,18 +41,24 @@ public class BeamIOSourceRel extends TableScan implements BeamRelNode {
}
@Override
- public PCollection<BeamSQLRow> buildBeamPipeline(BeamPipelineCreator planCreator)
+ public PCollection<BeamSQLRow> buildBeamPipeline(PCollectionTuple inputPCollections)
throws Exception {
- String sourceName = Joiner.on('.').join(getTable().getQualifiedName()).replace(".(STREAM)", "");
-
- BaseBeamTable sourceTable = planCreator.getSourceTables().get(sourceName);
+ String sourceName = Joiner.on('.').join(getTable().getQualifiedName());
String stageName = BeamSQLRelUtils.getStageName(this);
- PCollection<BeamSQLRow> sourceStream = sourceTable.buildIOReader(planCreator.getPipeline());
-
- return sourceStream;
+ TupleTag<BeamSQLRow> sourceTupleTag = new TupleTag<BeamSQLRow>(sourceName);
+ if (inputPCollections.has(sourceTupleTag)) {
+ //choose PCollection from input PCollectionTuple if exists there.
+ PCollection<BeamSQLRow> sourceStream = inputPCollections
+ .get(new TupleTag<BeamSQLRow>(sourceName));
+ return sourceStream;
+ } else {
+ //If not, the source PColection is provided with BaseBeamTable.buildIOReader().
+ BaseBeamTable sourceTable = BeamSqlEnv.findTable(sourceName);
+ return sourceTable.buildIOReader(inputPCollections.getPipeline());
+ }
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java
index 954868d..e2645f1 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java
@@ -21,13 +21,14 @@ import java.util.List;
import org.apache.beam.dsls.sql.interpreter.BeamSQLExpressionExecutor;
import org.apache.beam.dsls.sql.interpreter.BeamSQLFnExecutor;
-import org.apache.beam.dsls.sql.planner.BeamPipelineCreator;
import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils;
import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
import org.apache.beam.dsls.sql.transform.BeamSQLProjectFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
@@ -60,21 +61,21 @@ public class BeamProjectRel extends Project implements BeamRelNode {
}
@Override
- public PCollection<BeamSQLRow> buildBeamPipeline(BeamPipelineCreator planCreator)
+ public PCollection<BeamSQLRow> buildBeamPipeline(PCollectionTuple inputPCollections)
throws Exception {
RelNode input = getInput();
String stageName = BeamSQLRelUtils.getStageName(this);
- PCollection<BeamSQLRow> upstream = BeamSQLRelUtils.getBeamRelInput(input)
- .buildBeamPipeline(planCreator);
+ PCollection<BeamSQLRow> upstream =
+ BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections);
BeamSQLExpressionExecutor executor = new BeamSQLFnExecutor(this);
PCollection<BeamSQLRow> projectStream = upstream.apply(stageName, ParDo
.of(new BeamSQLProjectFn(getRelTypeName(), executor, BeamSQLRecordType.from(rowType))));
+ projectStream.setCoder(new BeamSqlRowCoder(BeamSQLRecordType.from(getRowType())));
return projectStream;
-
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java
index ff2b5b6..ed58090 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java
@@ -20,18 +20,19 @@ package org.apache.beam.dsls.sql.rel;
import org.apache.beam.dsls.sql.planner.BeamPipelineCreator;
import org.apache.beam.dsls.sql.schema.BeamSQLRow;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.calcite.rel.RelNode;
/**
- * A new method {@link #buildBeamPipeline(BeamPipelineCreator)} is added, it's
+ * A new method {@link #buildBeamPipeline(PCollectionTuple)} is added, it's
* called by {@link BeamPipelineCreator}.
- *
*/
public interface BeamRelNode extends RelNode {
/**
- * {@code #buildBeamPipeline(BeamPipelineCreator)} applies a transform to upstream,
- * and generate an output {@code PCollection}.
+ * A {@link BeamRelNode} is a recursive structure, the
+ * {@link BeamPipelineCreator} visits it with a DFS(Depth-First-Search)
+ * algorithm.
*/
- PCollection<BeamSQLRow> buildBeamPipeline(BeamPipelineCreator planCreator) throws Exception;
+ PCollection<BeamSQLRow> buildBeamPipeline(PCollectionTuple inputPCollections) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java
index 3df2f34..06a4edf 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java
@@ -26,16 +26,19 @@ import java.util.Comparator;
import java.util.List;
import org.apache.beam.dsls.sql.exception.BeamSqlUnsupportedException;
-import org.apache.beam.dsls.sql.planner.BeamPipelineCreator;
import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils;
+import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
import org.apache.beam.dsls.sql.schema.UnsupportedDataTypeException;
+import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Top;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelCollation;
@@ -120,11 +123,11 @@ public class BeamSortRel extends Sort implements BeamRelNode {
}
}
- @Override public PCollection<BeamSQLRow> buildBeamPipeline(
- BeamPipelineCreator planCreator) throws Exception {
+ @Override public PCollection<BeamSQLRow> buildBeamPipeline(PCollectionTuple inputPCollections)
+ throws Exception {
RelNode input = getInput();
PCollection<BeamSQLRow> upstream = BeamSQLRelUtils.getBeamRelInput(input)
- .buildBeamPipeline(planCreator);
+ .buildBeamPipeline(inputPCollections);
Type windowType = upstream.getWindowingStrategy().getWindowFn()
.getWindowTypeDescriptor().getType();
if (!windowType.equals(GlobalWindow.class)) {
@@ -137,16 +140,20 @@ public class BeamSortRel extends Sort implements BeamRelNode {
// first find the top (offset + count)
PCollection<List<BeamSQLRow>> rawStream =
upstream.apply("extractTopOffsetAndFetch",
- Top.of(startIndex + count, comparator).withoutDefaults());
+ Top.of(startIndex + count, comparator).withoutDefaults())
+ .setCoder(ListCoder.<BeamSQLRow>of(upstream.getCoder()));
// strip the `leading offset`
if (startIndex > 0) {
rawStream = rawStream.apply("stripLeadingOffset", ParDo.of(
- new SubListFn<BeamSQLRow>(startIndex, startIndex + count)));
+ new SubListFn<BeamSQLRow>(startIndex, startIndex + count)))
+ .setCoder(ListCoder.<BeamSQLRow>of(upstream.getCoder()));
}
PCollection<BeamSQLRow> orderedStream = rawStream.apply(
"flatten", Flatten.<BeamSQLRow>iterables());
+ orderedStream.setCoder(new BeamSqlRowCoder(BeamSQLRecordType.from(getRowType())));
+
return orderedStream;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java
index 4fbe7ec..ea59906 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java
@@ -23,13 +23,14 @@ import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.List;
-import org.apache.beam.dsls.sql.planner.BeamPipelineCreator;
import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils;
import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
import org.apache.beam.dsls.sql.schema.BeamTableUtils;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.core.Values;
@@ -56,8 +57,8 @@ public class BeamValuesRel extends Values implements BeamRelNode {
}
- @Override public PCollection<BeamSQLRow> buildBeamPipeline(
- BeamPipelineCreator planCreator) throws Exception {
+ @Override public PCollection<BeamSQLRow> buildBeamPipeline(PCollectionTuple inputPCollections)
+ throws Exception {
List<BeamSQLRow> rows = new ArrayList<>(tuples.size());
String stageName = BeamSQLRelUtils.getStageName(this);
if (tuples.isEmpty()) {
@@ -73,6 +74,7 @@ public class BeamValuesRel extends Values implements BeamRelNode {
rows.add(row);
}
- return planCreator.getPipeline().apply(stageName, Create.of(rows));
+ return inputPCollections.getPipeline().apply(stageName, Create.of(rows))
+ .setCoder(new BeamSqlRowCoder(beamSQLRecordType));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordType.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordType.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordType.java
index 94531f0..e8fa82f 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordType.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordType.java
@@ -21,7 +21,10 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeFactory.FieldInfoBuilder;
import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelProtoDataType;
import org.apache.calcite.sql.type.SqlTypeName;
/**
@@ -33,6 +36,9 @@ public class BeamSQLRecordType implements Serializable {
private List<String> fieldsName = new ArrayList<>();
private List<SqlTypeName> fieldsType = new ArrayList<>();
+ /**
+ * Generate from {@link RelDataType} which is used to create table.
+ */
public static BeamSQLRecordType from(RelDataType tableInfo) {
BeamSQLRecordType record = new BeamSQLRecordType();
for (RelDataTypeField f : tableInfo.getFieldList()) {
@@ -47,6 +53,22 @@ public class BeamSQLRecordType implements Serializable {
fieldsType.add(fieldType);
}
+ /**
+ * Create an instance of {@link RelDataType} so it can be used to create a table.
+ */
+ public RelProtoDataType toRelDataType() {
+ return new RelProtoDataType() {
+ @Override
+ public RelDataType apply(RelDataTypeFactory a) {
+ FieldInfoBuilder builder = a.builder();
+ for (int idx = 0; idx < fieldsName.size(); ++idx) {
+ builder.add(fieldsName.get(idx), fieldsType.get(idx));
+ }
+ return builder.build();
+ }
+ };
+ }
+
public int size() {
return fieldsName.size();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordTypeCoder.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordTypeCoder.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordTypeCoder.java
deleted file mode 100644
index b88a195..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordTypeCoder.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.schema;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.List;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.StandardCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * A {@link Coder} for {@link BeamSQLRecordType}.
- *
- */
-public class BeamSQLRecordTypeCoder extends StandardCoder<BeamSQLRecordType> {
- private static final StringUtf8Coder stringCoder = StringUtf8Coder.of();
- private static final VarIntCoder intCoder = VarIntCoder.of();
-
- private static final BeamSQLRecordTypeCoder INSTANCE = new BeamSQLRecordTypeCoder();
- private BeamSQLRecordTypeCoder(){}
-
- public static BeamSQLRecordTypeCoder of() {
- return INSTANCE;
- }
-
- @Override
- public void encode(BeamSQLRecordType value, OutputStream outStream,
- org.apache.beam.sdk.coders.Coder.Context context) throws CoderException, IOException {
- Context nested = context.nested();
- intCoder.encode(value.size(), outStream, nested);
- for (String fieldName : value.getFieldsName()) {
- stringCoder.encode(fieldName, outStream, nested);
- }
- for (SqlTypeName fieldType : value.getFieldsType()) {
- stringCoder.encode(fieldType.name(), outStream, nested);
- }
- //add a dummy field to indicate the end of record
- intCoder.encode(value.size(), outStream, context);
- }
-
- @Override
- public BeamSQLRecordType decode(InputStream inStream,
- org.apache.beam.sdk.coders.Coder.Context context) throws CoderException, IOException {
- BeamSQLRecordType typeRecord = new BeamSQLRecordType();
- int size = intCoder.decode(inStream, context.nested());
- for (int idx = 0; idx < size; ++idx) {
- typeRecord.getFieldsName().add(stringCoder.decode(inStream, context.nested()));
- }
- for (int idx = 0; idx < size; ++idx) {
- typeRecord.getFieldsType().add(
- SqlTypeName.valueOf(stringCoder.decode(inStream, context.nested())));
- }
- intCoder.decode(inStream, context);
- return typeRecord;
- }
-
- @Override
- public List<? extends Coder<?>> getCoderArguments() {
- return null;
- }
-
- @Override
- public void verifyDeterministic()
- throws org.apache.beam.sdk.coders.Coder.NonDeterministicException {
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
index 0bfe467..f161d27 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
@@ -23,24 +23,22 @@ import java.io.OutputStream;
import java.util.Date;
import java.util.GregorianCalendar;
import java.util.List;
-
import org.apache.beam.sdk.coders.BigDecimalCoder;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.DoubleCoder;
import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.coders.StandardCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
/**
- * A {@link Coder} encodes {@link BeamSQLRow}.
- *
+ * A {@link Coder} encodes {@link BeamSQLRow}.
*/
-public class BeamSqlRowCoder extends StandardCoder<BeamSQLRow>{
- private static final BeamSQLRecordTypeCoder recordTypeCoder = BeamSQLRecordTypeCoder.of();
+public class BeamSqlRowCoder extends CustomCoder<BeamSQLRow> {
+ private BeamSQLRecordType tableSchema;
private static final ListCoder<Integer> listCoder = ListCoder.of(BigEndianIntegerCoder.of());
@@ -51,17 +49,13 @@ public class BeamSqlRowCoder extends StandardCoder<BeamSQLRow>{
private static final InstantCoder instantCoder = InstantCoder.of();
private static final BigDecimalCoder bigDecimalCoder = BigDecimalCoder.of();
- private static final BeamSqlRowCoder INSTANCE = new BeamSqlRowCoder();
- private BeamSqlRowCoder(){}
-
- public static BeamSqlRowCoder of() {
- return INSTANCE;
+ public BeamSqlRowCoder(BeamSQLRecordType tableSchema) {
+ this.tableSchema = tableSchema;
}
@Override
public void encode(BeamSQLRow value, OutputStream outStream,
org.apache.beam.sdk.coders.Coder.Context context) throws CoderException, IOException {
- recordTypeCoder.encode(value.getDataType(), outStream, context.nested());
listCoder.encode(value.getNullFields(), outStream, context.nested());
for (int idx = 0; idx < value.size(); ++idx) {
@@ -115,18 +109,17 @@ public class BeamSqlRowCoder extends StandardCoder<BeamSQLRow>{
@Override
public BeamSQLRow decode(InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context)
throws CoderException, IOException {
- BeamSQLRecordType type = recordTypeCoder.decode(inStream, context.nested());
List<Integer> nullFields = listCoder.decode(inStream, context.nested());
- BeamSQLRow record = new BeamSQLRow(type);
+ BeamSQLRow record = new BeamSQLRow(tableSchema);
record.setNullFields(nullFields);
- for (int idx = 0; idx < type.size(); ++idx) {
+ for (int idx = 0; idx < tableSchema.size(); ++idx) {
if (nullFields.contains(idx)) {
continue;
}
- switch (type.getFieldsType().get(idx)) {
+ switch (tableSchema.getFieldsType().get(idx)) {
case INTEGER:
record.addField(idx, intCoder.decode(inStream, context.nested()));
break;
@@ -162,7 +155,7 @@ public class BeamSqlRowCoder extends StandardCoder<BeamSQLRow>{
break;
default:
- throw new UnsupportedDataTypeException(type.getFieldsType().get(idx));
+ throw new UnsupportedDataTypeException(tableSchema.getFieldsType().get(idx));
}
}
@@ -172,15 +165,12 @@ public class BeamSqlRowCoder extends StandardCoder<BeamSQLRow>{
return record;
}
- @Override
- public List<? extends Coder<?>> getCoderArguments() {
- return null;
+ public BeamSQLRecordType getTableSchema() {
+ return tableSchema;
}
@Override
public void verifyDeterministic()
throws org.apache.beam.sdk.coders.Coder.NonDeterministicException {
-
}
-
}
http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java
index 0d9d147..03f7705 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java
@@ -21,7 +21,7 @@ import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
-import org.apache.beam.dsls.sql.BeamSQLEnvironment;
+import org.apache.beam.dsls.sql.BeamSqlEnv;
import org.apache.beam.dsls.sql.schema.BaseBeamTable;
import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
import org.apache.beam.dsls.sql.schema.BeamSQLRow;
@@ -38,13 +38,11 @@ import org.junit.BeforeClass;
*
*/
public class BasePlanner {
- public static BeamSQLEnvironment runner = BeamSQLEnvironment.create();
-
@BeforeClass
public static void prepareClass() {
- runner.addTableMetadata("ORDER_DETAILS", getTable());
- runner.addTableMetadata("SUB_ORDER", getTable("127.0.0.1:9092", "sub_orders"));
- runner.addTableMetadata("SUB_ORDER_RAM", getTable());
+ BeamSqlEnv.registerTable("ORDER_DETAILS", getTable());
+ BeamSqlEnv.registerTable("SUB_ORDER", getTable("127.0.0.1:9092", "sub_orders"));
+ BeamSqlEnv.registerTable("SUB_ORDER_RAM", getTable());
}
private static BaseBeamTable getTable() {
http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByExplainTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByExplainTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByExplainTest.java
index 98d14c3..4ea0662 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByExplainTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByExplainTest.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.dsls.sql.planner;
+import org.apache.beam.dsls.sql.BeamSqlCli;
+import org.apache.beam.dsls.sql.BeamSqlEnv;
import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlUdfExpressionTest;
import org.junit.Test;
@@ -33,7 +35,7 @@ public class BeamGroupByExplainTest extends BasePlanner {
public void testSimpleGroupExplain() throws Exception {
String sql = "SELECT COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS "
+ "WHERE SITE_ID = 0 ";
- String plan = runner.executionPlan(sql);
+ String plan = BeamSqlCli.explainQuery(sql);
}
/**
@@ -43,7 +45,7 @@ public class BeamGroupByExplainTest extends BasePlanner {
public void testSimpleGroup2Explain() throws Exception {
String sql = "SELECT site_id" + ", COUNT(*) " + "FROM ORDER_DETAILS "
+ "WHERE SITE_ID = 0 " + "GROUP BY site_id";
- String plan = runner.executionPlan(sql);
+ String plan = BeamSqlCli.explainQuery(sql);
}
/**
@@ -54,7 +56,7 @@ public class BeamGroupByExplainTest extends BasePlanner {
String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS "
+ "WHERE SITE_ID = 0 " + "GROUP BY order_id, site_id"
+ ", TUMBLE(order_time, INTERVAL '1' HOUR)";
- String plan = runner.executionPlan(sql);
+ String plan = BeamSqlCli.explainQuery(sql);
}
/**
@@ -66,7 +68,7 @@ public class BeamGroupByExplainTest extends BasePlanner {
+ "TUMBLE_START(order_time, INTERVAL '1' HOUR, TIME '00:00:01')"
+ ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 "
+ "GROUP BY order_id, site_id" + ", TUMBLE(order_time, INTERVAL '1' HOUR, TIME '00:00:01')";
- String plan = runner.executionPlan(sql);
+ String plan = BeamSqlCli.explainQuery(sql);
}
/**
@@ -77,7 +79,7 @@ public class BeamGroupByExplainTest extends BasePlanner {
String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS "
+ "WHERE SITE_ID = 0 " + "GROUP BY order_id, site_id"
+ ", HOP(order_time, INTERVAL '5' MINUTE, INTERVAL '1' HOUR)";
- String plan = runner.executionPlan(sql);
+ String plan = BeamSqlCli.explainQuery(sql);
}
/**
@@ -88,7 +90,7 @@ public class BeamGroupByExplainTest extends BasePlanner {
String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS "
+ "WHERE SITE_ID = 0 " + "GROUP BY order_id, site_id"
+ ", SESSION(order_time, INTERVAL '5' MINUTE)";
- String plan = runner.executionPlan(sql);
+ String plan = BeamSqlCli.explainQuery(sql);
}
/**
@@ -96,9 +98,9 @@ public class BeamGroupByExplainTest extends BasePlanner {
*/
@Test
public void testUdf() throws Exception {
- runner.addUDFFunction("negative", BeamSqlUdfExpressionTest.UdfFn.class, "negative");
+ BeamSqlEnv.registerUdf("negative", BeamSqlUdfExpressionTest.UdfFn.class, "negative");
String sql = "select site_id, negative(site_id) as nsite_id from ORDER_DETAILS";
- String plan = runner.executionPlan(sql);
+ String plan = BeamSqlCli.explainQuery(sql);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByPipelineTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByPipelineTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByPipelineTest.java
index 5101c98..0436ca1 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByPipelineTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByPipelineTest.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.dsls.sql.planner;
+import org.apache.beam.dsls.sql.BeamSqlCli;
+import org.apache.beam.dsls.sql.BeamSqlEnv;
import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlUdfExpressionTest;
import org.apache.beam.dsls.sql.schema.BeamSQLRow;
import org.apache.beam.sdk.testing.TestPipeline;
@@ -37,7 +39,7 @@ public class BeamGroupByPipelineTest extends BasePlanner {
public void testSimpleGroupExplain() throws Exception {
String sql = "SELECT COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS "
+ "WHERE SITE_ID = 0 ";
- PCollection<BeamSQLRow> outputStream = runner.compileBeamPipeline(sql, pipeline);
+ PCollection<BeamSQLRow> outputStream = BeamSqlCli.compilePipeline(sql, pipeline);
}
/**
@@ -47,7 +49,7 @@ public class BeamGroupByPipelineTest extends BasePlanner {
public void testSimpleGroup2Explain() throws Exception {
String sql = "SELECT site_id" + ", COUNT(*) " + "FROM ORDER_DETAILS "
+ "WHERE SITE_ID = 0 " + "GROUP BY site_id";
- PCollection<BeamSQLRow> outputStream = runner.compileBeamPipeline(sql, pipeline);
+ PCollection<BeamSQLRow> outputStream = BeamSqlCli.compilePipeline(sql, pipeline);
}
/**
@@ -58,7 +60,7 @@ public class BeamGroupByPipelineTest extends BasePlanner {
String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS "
+ "WHERE SITE_ID = 0 " + "GROUP BY order_id, site_id"
+ ", TUMBLE(order_time, INTERVAL '1' HOUR)";
- PCollection<BeamSQLRow> outputStream = runner.compileBeamPipeline(sql, pipeline);
+ PCollection<BeamSQLRow> outputStream = BeamSqlCli.compilePipeline(sql, pipeline);
}
/**
@@ -70,7 +72,7 @@ public class BeamGroupByPipelineTest extends BasePlanner {
+ "TUMBLE_START(order_time, INTERVAL '1' HOUR, TIME '00:00:01')"
+ ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 "
+ "GROUP BY order_id, site_id" + ", TUMBLE(order_time, INTERVAL '1' HOUR, TIME '00:00:01')";
- PCollection<BeamSQLRow> outputStream = runner.compileBeamPipeline(sql, pipeline);
+ PCollection<BeamSQLRow> outputStream = BeamSqlCli.compilePipeline(sql, pipeline);
}
/**
@@ -81,7 +83,7 @@ public class BeamGroupByPipelineTest extends BasePlanner {
String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS "
+ "WHERE SITE_ID = 0 " + "GROUP BY order_id, site_id"
+ ", HOP(order_time, INTERVAL '5' MINUTE, INTERVAL '1' HOUR)";
- PCollection<BeamSQLRow> outputStream = runner.compileBeamPipeline(sql, pipeline);
+ PCollection<BeamSQLRow> outputStream = BeamSqlCli.compilePipeline(sql, pipeline);
}
/**
@@ -92,7 +94,7 @@ public class BeamGroupByPipelineTest extends BasePlanner {
String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS "
+ "WHERE SITE_ID = 0 " + "GROUP BY order_id, site_id"
+ ", SESSION(order_time, INTERVAL '5' MINUTE)";
- PCollection<BeamSQLRow> outputStream = runner.compileBeamPipeline(sql, pipeline);
+ PCollection<BeamSQLRow> outputStream = BeamSqlCli.compilePipeline(sql, pipeline);
}
/**
@@ -100,10 +102,10 @@ public class BeamGroupByPipelineTest extends BasePlanner {
*/
@Test
public void testUdf() throws Exception {
- runner.addUDFFunction("negative", BeamSqlUdfExpressionTest.UdfFn.class, "negative");
+ BeamSqlEnv.registerUdf("negative", BeamSqlUdfExpressionTest.UdfFn.class, "negative");
String sql = "select site_id, negative(site_id) as nsite_id from ORDER_DETAILS";
- PCollection<BeamSQLRow> outputStream = runner.compileBeamPipeline(sql, pipeline);
+ PCollection<BeamSQLRow> outputStream = BeamSqlCli.compilePipeline(sql, pipeline);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamInvalidGroupByTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamInvalidGroupByTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamInvalidGroupByTest.java
index 72b5bf7..946a9fd 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamInvalidGroupByTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamInvalidGroupByTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.dsls.sql.planner;
+import org.apache.beam.dsls.sql.BeamSqlCli;
import org.apache.beam.dsls.sql.schema.BeamSQLRow;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.values.PCollection;
@@ -36,7 +37,7 @@ public class BeamInvalidGroupByTest extends BasePlanner {
public void testTumble2Explain() throws Exception {
String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS "
+ "WHERE SITE_ID = 0 " + "GROUP BY order_id" + ", TUMBLE(order_time, INTERVAL '1' HOUR)";
- PCollection<BeamSQLRow> outputStream = runner.compileBeamPipeline(sql, pipeline);
+ PCollection<BeamSQLRow> outputStream = BeamSqlCli.compilePipeline(sql, pipeline);
}
@Test(expected = ValidationException.class)
@@ -44,7 +45,7 @@ public class BeamInvalidGroupByTest extends BasePlanner {
String sql = "SELECT order_id, site_id, TUMBLE(order_time, INTERVAL '1' HOUR)"
+ ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 "
+ "GROUP BY order_id, site_id" + ", TUMBLE(order_time, INTERVAL '1' HOUR)";
- PCollection<BeamSQLRow> outputStream = runner.compileBeamPipeline(sql, pipeline);
+ PCollection<BeamSQLRow> outputStream = BeamSqlCli.compilePipeline(sql, pipeline);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java
index ffc3e01..a296eec 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java
@@ -22,7 +22,8 @@ import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
-import org.apache.beam.dsls.sql.BeamSQLEnvironment;
+import org.apache.beam.dsls.sql.BeamSqlCli;
+import org.apache.beam.dsls.sql.BeamSqlEnv;
import org.apache.beam.dsls.sql.schema.BaseBeamTable;
import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
import org.apache.beam.dsls.sql.schema.BeamSQLRow;
@@ -43,15 +44,14 @@ import org.junit.Test;
*/
public class BeamPlannerAggregationSubmitTest {
public static DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- public static BeamSQLEnvironment runner = BeamSQLEnvironment.create();
@Rule
public final TestPipeline pipeline = TestPipeline.create();
@BeforeClass
public static void prepareClass() throws ParseException {
- runner.addTableMetadata("ORDER_DETAILS", getOrderTable());
- runner.addTableMetadata("ORDER_SUMMARY", getSummaryTable());
+ BeamSqlEnv.registerTable("ORDER_DETAILS", getOrderTable());
+ BeamSqlEnv.registerTable("ORDER_SUMMARY", getSummaryTable());
}
@Before
@@ -120,7 +120,7 @@ public class BeamPlannerAggregationSubmitTest {
+ "WHERE SITE_ID = 1 " + "GROUP BY site_id"
+ ", TUMBLE(order_time, INTERVAL '1' HOUR, TIME '00:00:01')";
- runner.compileBeamPipeline(sql, pipeline);
+ BeamSqlCli.compilePipeline(sql, pipeline);
pipeline.run().waitUntilFinish();
@@ -137,7 +137,7 @@ public class BeamPlannerAggregationSubmitTest {
+ "SELECT site_id, COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS "
+ "WHERE SITE_ID = 0 " + "GROUP BY site_id";
- runner.compileBeamPipeline(sql, pipeline);
+ BeamSqlCli.compilePipeline(sql, pipeline);
pipeline.run().waitUntilFinish();
http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerExplainTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerExplainTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerExplainTest.java
index 1355d5d..e617ff2 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerExplainTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerExplainTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.dsls.sql.planner;
+import org.apache.beam.dsls.sql.BeamSqlCli;
import org.junit.Assert;
import org.junit.Test;
@@ -28,7 +29,7 @@ public class BeamPlannerExplainTest extends BasePlanner {
@Test
public void selectAll() throws Exception {
String sql = "SELECT * FROM ORDER_DETAILS";
- String plan = runner.executionPlan(sql);
+ String plan = BeamSqlCli.explainQuery(sql);
String expectedPlan =
"BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2], order_time=[$3])\n"
@@ -40,7 +41,7 @@ public class BeamPlannerExplainTest extends BasePlanner {
public void selectWithFilter() throws Exception {
String sql = "SELECT " + " order_id, site_id, price " + "FROM ORDER_DETAILS "
+ "WHERE SITE_ID = 0 and price > 20";
- String plan = runner.executionPlan(sql);
+ String plan = BeamSqlCli.explainQuery(sql);
String expectedPlan = "BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2])\n"
+ " BeamFilterRel(condition=[AND(=($1, 0), >($2, 20))])\n"
@@ -53,7 +54,7 @@ public class BeamPlannerExplainTest extends BasePlanner {
String sql = "INSERT INTO SUB_ORDER(order_id, site_id, price) " + "SELECT "
+ " order_id, site_id, price " + "FROM ORDER_DETAILS "
+ "WHERE SITE_ID = 0 and price > 20";
- String plan = runner.executionPlan(sql);
+ String plan = BeamSqlCli.explainQuery(sql);
String expectedPlan =
"BeamIOSinkRel(table=[[SUB_ORDER]], operation=[INSERT], flattened=[true])\n"
http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java
index 7219d11..8a48618 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.dsls.sql.planner;
+import org.apache.beam.dsls.sql.BeamSqlCli;
import org.apache.beam.dsls.sql.schema.BeamSQLRow;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.values.PCollection;
@@ -44,7 +45,7 @@ public class BeamPlannerSubmitTest extends BasePlanner {
+ " order_id, site_id, price "
+ "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 and price > 20";
- PCollection<BeamSQLRow> outputStream = runner.compileBeamPipeline(sql, pipeline);
+ PCollection<BeamSQLRow> outputStream = BeamSqlCli.compilePipeline(sql, pipeline);
pipeline.run().waitUntilFinish();
http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java
index 4935c3b..a44b0d9 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java
@@ -22,7 +22,8 @@ import java.util.Collection;
import java.util.Date;
import java.util.Iterator;
-import org.apache.beam.dsls.sql.BeamSQLEnvironment;
+import org.apache.beam.dsls.sql.BeamSqlCli;
+import org.apache.beam.dsls.sql.BeamSqlEnv;
import org.apache.beam.dsls.sql.exception.BeamSqlUnsupportedException;
import org.apache.beam.dsls.sql.planner.MockedBeamSQLTable;
import org.apache.beam.dsls.sql.schema.BeamSQLRow;
@@ -37,7 +38,6 @@ import org.junit.Test;
* Test for {@code BeamSortRel}.
*/
public class BeamSortRelTest {
- public static BeamSQLEnvironment runner = BeamSQLEnvironment.create();
@Rule
public final TestPipeline pipeline = TestPipeline.create();
@@ -71,7 +71,7 @@ public class BeamSortRelTest {
+ "ORDER BY order_id asc, site_id desc limit 4";
System.out.println(sql);
- runner.compileBeamPipeline(sql, pipeline);
+ BeamSqlCli.compilePipeline(sql, pipeline);
pipeline.run().waitUntilFinish();
assertEquals(
@@ -88,7 +88,7 @@ public class BeamSortRelTest {
@Test
public void testOrderBy_nullsFirst() throws Exception {
- runner.addTableMetadata("ORDER_DETAILS", MockedBeamSQLTable
+ BeamSqlEnv.registerTable("ORDER_DETAILS", MockedBeamSQLTable
.of(SqlTypeName.BIGINT, "order_id",
SqlTypeName.INTEGER, "site_id",
SqlTypeName.DOUBLE, "price",
@@ -98,7 +98,7 @@ public class BeamSortRelTest {
2L, 1, 3.0,
2L, null, 4.0,
5L, 5, 5.0));
- runner.addTableMetadata("SUB_ORDER_RAM", MockedBeamSQLTable
+ BeamSqlEnv.registerTable("SUB_ORDER_RAM", MockedBeamSQLTable
.of(SqlTypeName.BIGINT, "order_id",
SqlTypeName.INTEGER, "site_id",
SqlTypeName.DOUBLE, "price"));
@@ -108,7 +108,7 @@ public class BeamSortRelTest {
+ "FROM ORDER_DETAILS "
+ "ORDER BY order_id asc, site_id desc NULLS FIRST limit 4";
- runner.compileBeamPipeline(sql, pipeline);
+ BeamSqlCli.compilePipeline(sql, pipeline);
pipeline.run().waitUntilFinish();
assertEquals(
@@ -126,7 +126,7 @@ public class BeamSortRelTest {
@Test
public void testOrderBy_nullsLast() throws Exception {
- runner.addTableMetadata("ORDER_DETAILS", MockedBeamSQLTable
+ BeamSqlEnv.registerTable("ORDER_DETAILS", MockedBeamSQLTable
.of(SqlTypeName.BIGINT, "order_id",
SqlTypeName.INTEGER, "site_id",
SqlTypeName.DOUBLE, "price",
@@ -136,7 +136,7 @@ public class BeamSortRelTest {
2L, 1, 3.0,
2L, null, 4.0,
5L, 5, 5.0));
- runner.addTableMetadata("SUB_ORDER_RAM", MockedBeamSQLTable
+ BeamSqlEnv.registerTable("SUB_ORDER_RAM", MockedBeamSQLTable
.of(SqlTypeName.BIGINT, "order_id",
SqlTypeName.INTEGER, "site_id",
SqlTypeName.DOUBLE, "price"));
@@ -146,7 +146,7 @@ public class BeamSortRelTest {
+ "FROM ORDER_DETAILS "
+ "ORDER BY order_id asc, site_id desc NULLS LAST limit 4";
- runner.compileBeamPipeline(sql, pipeline);
+ BeamSqlCli.compilePipeline(sql, pipeline);
pipeline.run().waitUntilFinish();
assertEquals(
@@ -169,7 +169,7 @@ public class BeamSortRelTest {
+ "FROM ORDER_DETAILS "
+ "ORDER BY order_id asc, site_id desc limit 4 offset 4";
- runner.compileBeamPipeline(sql, pipeline);
+ BeamSqlCli.compilePipeline(sql, pipeline);
pipeline.run().waitUntilFinish();
assertEquals(
@@ -192,7 +192,7 @@ public class BeamSortRelTest {
+ "FROM ORDER_DETAILS "
+ "ORDER BY order_id asc, site_id desc limit 11";
- runner.compileBeamPipeline(sql, pipeline);
+ BeamSqlCli.compilePipeline(sql, pipeline);
pipeline.run().waitUntilFinish();
assertEquals(
@@ -223,13 +223,13 @@ public class BeamSortRelTest {
+ "ORDER BY order_id asc limit 11";
TestPipeline pipeline = TestPipeline.create();
- runner.compileBeamPipeline(sql, pipeline);
+ BeamSqlCli.compilePipeline(sql, pipeline);
}
@Before
public void prepare() {
- runner.addTableMetadata("ORDER_DETAILS", orderDetailTable);
- runner.addTableMetadata("SUB_ORDER_RAM", subOrderRamTable);
+ BeamSqlEnv.registerTable("ORDER_DETAILS", orderDetailTable);
+ BeamSqlEnv.registerTable("SUB_ORDER_RAM", subOrderRamTable);
MockedBeamSQLTable.CONTENT.clear();
}