You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/07/05 16:34:29 UTC
[1/3] beam git commit: BeamSql: refactor the MockedBeamSqlTable and
related tests
Repository: beam
Updated Branches:
refs/heads/DSL_SQL 7ba77dd43 -> ca2bc723d
BeamSql: refactor the MockedBeamSqlTable and related tests
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/21497194
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/21497194
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/21497194
Branch: refs/heads/DSL_SQL
Commit: 21497194db3ddce37a4747b3de2714b02684c57e
Parents: 7ba77dd
Author: James Xu <xu...@gmail.com>
Authored: Tue Jun 27 10:42:40 2017 +0800
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Jul 5 09:33:40 2017 -0700
----------------------------------------------------------------------
.../dsls/sql/planner/MockedBeamSqlTable.java | 21 +++---
.../beam/dsls/sql/rel/BeamMinusRelTest.java | 1 -
.../beam/dsls/sql/rel/BeamSortRelTest.java | 79 ++++++++------------
.../beam/dsls/sql/rel/BeamValuesRelTest.java | 6 --
4 files changed, 42 insertions(+), 65 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/21497194/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java
index fa80cc1..bb10369 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java
@@ -40,23 +40,16 @@ import org.apache.calcite.rel.type.RelProtoDataType;
import org.apache.calcite.sql.type.SqlTypeName;
/**
- * A mock table use to check input/output.
- *
+ * Mocked table for bounded data sources.
*/
public class MockedBeamSqlTable extends BaseBeamTable {
- public static final AtomicInteger COUNTER = new AtomicInteger();
- public static final ConcurrentLinkedQueue<BeamSqlRow> CONTENT = new ConcurrentLinkedQueue<>();
-
+ private static final AtomicInteger COUNTER = new AtomicInteger();
+ private static final ConcurrentLinkedQueue<BeamSqlRow> CONTENT = new ConcurrentLinkedQueue<>();
private List<BeamSqlRow> inputRecords;
public MockedBeamSqlTable(BeamSqlRecordType beamSqlRecordType) {
super(beamSqlRecordType);
}
- public MockedBeamSqlTable withInputRecords(List<BeamSqlRow> inputRecords){
- this.inputRecords = inputRecords;
- return this;
- }
-
/**
* Convenient way to build a mocked table with mock data:
*
@@ -81,6 +74,9 @@ public class MockedBeamSqlTable extends BaseBeamTable {
* 10L, 100, 10.0, new Date())
* }</pre>
*/
+ // FIXME: refactor this method
+ // 1) use Types rather than SqlTypeName
+ // 2) use RowsBuilder rather than duplicate the logic here
public static MockedBeamSqlTable of(final Object... args){
final RelProtoDataType protoRowType = new RelProtoDataType() {
@Override
@@ -112,7 +108,10 @@ public class MockedBeamSqlTable extends BaseBeamTable {
}
rows.add(row);
}
- return new MockedBeamSqlTable(beamSQLRecordType).withInputRecords(rows);
+ MockedBeamSqlTable table = new MockedBeamSqlTable(beamSQLRecordType);
+ table.inputRecords = rows;
+
+ return table;
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/21497194/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java
index 688ff8e..bb5e7ee 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java
@@ -62,7 +62,6 @@ public class BeamMinusRelTest {
public void setUp() {
sqlEnv.registerTable("ORDER_DETAILS1", orderDetailsTable1);
sqlEnv.registerTable("ORDER_DETAILS2", orderDetailsTable2);
- MockedBeamSqlTable.CONTENT.clear();
}
@Test
http://git-wip-us.apache.org/repos/asf/beam/blob/21497194/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java
index 2519984..d5c18fc 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java
@@ -18,16 +18,15 @@
package org.apache.beam.dsls.sql.rel;
-import java.util.Collection;
import java.util.Date;
-import java.util.Iterator;
import org.apache.beam.dsls.sql.BeamSqlCli;
import org.apache.beam.dsls.sql.BeamSqlEnv;
import org.apache.beam.dsls.sql.planner.MockedBeamSqlTable;
import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
import org.apache.calcite.sql.type.SqlTypeName;
-import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -70,20 +69,17 @@ public class BeamSortRelTest {
+ "FROM ORDER_DETAILS "
+ "ORDER BY order_id asc, site_id desc limit 4";
- System.out.println(sql);
- BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+ PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+ PAssert.that(rows).containsInAnyOrder(MockedBeamSqlTable.of(
+ SqlTypeName.BIGINT, "order_id",
+ SqlTypeName.INTEGER, "site_id",
+ SqlTypeName.DOUBLE, "price",
+ 1L, 2, 1.0,
+ 1L, 1, 2.0,
+ 2L, 4, 3.0,
+ 2L, 1, 4.0
+ ).getInputRecords());
pipeline.run().waitUntilFinish();
-
- assertEquals(
- MockedBeamSqlTable.of(
- SqlTypeName.BIGINT, "order_id",
- SqlTypeName.INTEGER, "site_id",
- SqlTypeName.DOUBLE, "price",
- 1L, 2, 1.0,
- 1L, 1, 2.0,
- 2L, 4, 3.0,
- 2L, 1, 4.0
- ).getInputRecords(), MockedBeamSqlTable.CONTENT);
}
@Test
@@ -108,10 +104,8 @@ public class BeamSortRelTest {
+ "FROM ORDER_DETAILS "
+ "ORDER BY order_id asc, site_id desc NULLS FIRST limit 4";
- BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
- pipeline.run().waitUntilFinish();
-
- assertEquals(
+ PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+ PAssert.that(rows).containsInAnyOrder(
MockedBeamSqlTable.of(
SqlTypeName.BIGINT, "order_id",
SqlTypeName.INTEGER, "site_id",
@@ -121,7 +115,9 @@ public class BeamSortRelTest {
1L, 2, 1.0,
2L, null, 4.0,
2L, 1, 3.0
- ).getInputRecords(), MockedBeamSqlTable.CONTENT);
+ ).getInputRecords()
+ );
+ pipeline.run().waitUntilFinish();
}
@Test
@@ -146,10 +142,8 @@ public class BeamSortRelTest {
+ "FROM ORDER_DETAILS "
+ "ORDER BY order_id asc, site_id desc NULLS LAST limit 4";
- BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
- pipeline.run().waitUntilFinish();
-
- assertEquals(
+ PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+ PAssert.that(rows).containsInAnyOrder(
MockedBeamSqlTable.of(
SqlTypeName.BIGINT, "order_id",
SqlTypeName.INTEGER, "site_id",
@@ -159,7 +153,9 @@ public class BeamSortRelTest {
1L, null, 2.0,
2L, 1, 3.0,
2L, null, 4.0
- ).getInputRecords(), MockedBeamSqlTable.CONTENT);
+ ).getInputRecords()
+ );
+ pipeline.run().waitUntilFinish();
}
@Test
@@ -169,10 +165,8 @@ public class BeamSortRelTest {
+ "FROM ORDER_DETAILS "
+ "ORDER BY order_id asc, site_id desc limit 4 offset 4";
- BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
- pipeline.run().waitUntilFinish();
-
- assertEquals(
+ PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+ PAssert.that(rows).containsInAnyOrder(
MockedBeamSqlTable.of(
SqlTypeName.BIGINT, "order_id",
SqlTypeName.INTEGER, "site_id",
@@ -182,7 +176,9 @@ public class BeamSortRelTest {
6L, 6, 6.0,
7L, 7, 7.0,
8L, 8888, 8.0
- ).getInputRecords(), MockedBeamSqlTable.CONTENT);
+ ).getInputRecords()
+ );
+ pipeline.run().waitUntilFinish();
}
@Test
@@ -192,10 +188,8 @@ public class BeamSortRelTest {
+ "FROM ORDER_DETAILS "
+ "ORDER BY order_id asc, site_id desc limit 11";
- BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
- pipeline.run().waitUntilFinish();
-
- assertEquals(
+ PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+ PAssert.that(rows).containsInAnyOrder(
MockedBeamSqlTable.of(
SqlTypeName.BIGINT, "order_id",
SqlTypeName.INTEGER, "site_id",
@@ -211,7 +205,9 @@ public class BeamSortRelTest {
8L, 8888, 8.0,
8L, 999, 9.0,
10L, 100, 10.0
- ).getInputRecords(), MockedBeamSqlTable.CONTENT);
+ ).getInputRecords()
+ );
+ pipeline.run().waitUntilFinish();
}
@Test(expected = UnsupportedOperationException.class)
@@ -230,16 +226,5 @@ public class BeamSortRelTest {
public void prepare() {
sqlEnv.registerTable("ORDER_DETAILS", orderDetailTable);
sqlEnv.registerTable("SUB_ORDER_RAM", subOrderRamTable);
- MockedBeamSqlTable.CONTENT.clear();
- }
-
- private void assertEquals(Collection<BeamSqlRow> rows1, Collection<BeamSqlRow> rows2) {
- Assert.assertEquals(rows1.size(), rows2.size());
-
- Iterator<BeamSqlRow> it1 = rows1.iterator();
- Iterator<BeamSqlRow> it2 = rows2.iterator();
- while (it1.hasNext()) {
- Assert.assertEquals(it1.next(), it2.next());
- }
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/21497194/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java
index 9a5070a..81b1a13 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java
@@ -26,7 +26,6 @@ import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.values.PCollection;
import org.apache.calcite.sql.type.SqlTypeName;
-import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
@@ -89,9 +88,4 @@ public class BeamValuesRelTest {
sqlEnv.registerTable("string_table", stringTable);
sqlEnv.registerTable("int_table", intTable);
}
-
- @Before
- public void prepare() {
- MockedBeamSqlTable.CONTENT.clear();
- }
}
[2/3] beam git commit: MockedBeamSqlTable -> MockedBoundedTable
Posted by lc...@apache.org.
MockedBeamSqlTable -> MockedBoundedTable
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bc66698e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bc66698e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bc66698e
Branch: refs/heads/DSL_SQL
Commit: bc66698e6880c7788bcea78006c67bfca66b17ce
Parents: 2149719
Author: James Xu <xu...@gmail.com>
Authored: Fri Jun 30 14:54:26 2017 +0800
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Jul 5 09:33:53 2017 -0700
----------------------------------------------------------------------
.../org/apache/beam/dsls/sql/TestUtils.java | 81 +++++++---
.../beam/dsls/sql/mock/MockedBoundedTable.java | 126 +++++++++++++++
.../apache/beam/dsls/sql/mock/MockedTable.java | 42 +++++
.../dsls/sql/mock/MockedUnboundedTable.java | 113 +++++++++++++
.../dsls/sql/planner/MockedBeamSqlTable.java | 162 -------------------
.../beam/dsls/sql/planner/MockedTable.java | 33 ----
.../dsls/sql/planner/MockedUnboundedTable.java | 120 --------------
.../beam/dsls/sql/rel/BeamIntersectRelTest.java | 78 ++++-----
.../rel/BeamJoinRelBoundedVsBoundedTest.java | 141 ++++++++--------
.../rel/BeamJoinRelUnboundedVsBoundedTest.java | 21 ++-
.../BeamJoinRelUnboundedVsUnboundedTest.java | 10 +-
.../beam/dsls/sql/rel/BeamMinusRelTest.java | 77 +++++----
.../sql/rel/BeamSetOperatorRelBaseTest.java | 68 +++-----
.../beam/dsls/sql/rel/BeamSortRelTest.java | 161 +++++++++---------
.../beam/dsls/sql/rel/BeamUnionRelTest.java | 47 +++---
.../beam/dsls/sql/rel/BeamValuesRelTest.java | 72 +++++----
16 files changed, 691 insertions(+), 661 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/bc66698e/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java
index 375027a..cfad333 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java
@@ -20,7 +20,6 @@ package org.apache.beam.dsls.sql;
import java.util.ArrayList;
import java.util.List;
-
import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
import org.apache.beam.dsls.sql.schema.BeamSqlRow;
import org.apache.beam.sdk.transforms.DoFn;
@@ -62,7 +61,7 @@ public class TestUtils {
* Types.INTEGER, "order_id",
* Types.INTEGER, "sum_site_id",
* Types.VARCHAR, "buyer"
- * ).values(
+ * ).addRows(
* 1, 3, "james",
* 2, 5, "bond"
* ).getStringRows()
@@ -81,15 +80,7 @@ public class TestUtils {
* @args pairs of column type and column names.
*/
public static RowsBuilder of(final Object... args) {
- List<Integer> types = new ArrayList<>();
- List<String> names = new ArrayList<>();
- int lastTypeIndex = 0;
- for (; lastTypeIndex < args.length; lastTypeIndex += 2) {
- types.add((int) args[lastTypeIndex]);
- names.add((String) args[lastTypeIndex + 1]);
- }
-
- BeamSqlRecordType beamSQLRecordType = BeamSqlRecordType.create(names, types);
+ BeamSqlRecordType beamSQLRecordType = buildBeamSqlRecordType(args);
RowsBuilder builder = new RowsBuilder();
builder.type = beamSQLRecordType;
@@ -97,20 +88,12 @@ public class TestUtils {
}
/**
- * Add values to the builder.
+ * Add rows to the builder.
*
* <p>Note: check the class javadoc for for detailed example.
*/
- public RowsBuilder values(final Object... args) {
- int fieldCount = type.size();
- for (int i = 0; i < args.length; i += fieldCount) {
- BeamSqlRow row = new BeamSqlRow(type);
- for (int j = 0; j < fieldCount; j++) {
- row.addField(j, args[i + j]);
- }
- this.rows.add(row);
- }
-
+ public RowsBuilder addRows(final Object... args) {
+ this.rows.addAll(buildRows(type, args));
return this;
}
@@ -122,4 +105,58 @@ public class TestUtils {
return beamSqlRows2Strings(rows);
}
}
+
+ /**
+ * Convenient way to build a {@code BeamSqlRecordType}.
+ *
+ * <p>e.g.
+ *
+ * <pre>{@code
+ * buildBeamSqlRecordType(
+ * Types.BIGINT, "order_id",
+ * Types.INTEGER, "site_id",
+ * Types.DOUBLE, "price",
+ * Types.TIMESTAMP, "order_time"
+ * )
+ * }</pre>
+ */
+ public static BeamSqlRecordType buildBeamSqlRecordType(Object... args) {
+ List<Integer> types = new ArrayList<>();
+ List<String> names = new ArrayList<>();
+
+ for (int i = 0; i < args.length - 1; i += 2) {
+ types.add((int) args[i]);
+ names.add((String) args[i + 1]);
+ }
+
+ return BeamSqlRecordType.create(names, types);
+ }
+
+ /**
+ * Convenient way to build a {@code BeamSqlRow}s.
+ *
+ * <p>e.g.
+ *
+ * <pre>{@code
+ * buildRows(
+ * recordType,
+ * 1, 1, 1, // the first row
+ * 2, 2, 2, // the second row
+ * ...
+ * )
+ * }</pre>
+ */
+ public static List<BeamSqlRow> buildRows(BeamSqlRecordType type, Object... args) {
+ List<BeamSqlRow> rows = new ArrayList<>();
+ int fieldCount = type.size();
+
+ for (int i = 0; i < args.length; i += fieldCount) {
+ BeamSqlRow row = new BeamSqlRow(type);
+ for (int j = 0; j < fieldCount; j++) {
+ row.addField(j, args[i + j]);
+ }
+ rows.add(row);
+ }
+ return rows;
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/bc66698e/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedBoundedTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedBoundedTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedBoundedTable.java
new file mode 100644
index 0000000..0fb8a80
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedBoundedTable.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.mock;
+
+import static org.apache.beam.dsls.sql.TestUtils.buildBeamSqlRecordType;
+import static org.apache.beam.dsls.sql.TestUtils.buildRows;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import org.apache.beam.dsls.sql.schema.BeamIOType;
+import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+
+/**
+ * Mocked table for bounded data sources.
+ */
+public class MockedBoundedTable extends MockedTable {
+ /** rows written to this table. */
+ private static final ConcurrentLinkedQueue<BeamSqlRow> CONTENT = new ConcurrentLinkedQueue<>();
+ /** rows flow out from this table. */
+ private final List<BeamSqlRow> rows = new ArrayList<>();
+
+ public MockedBoundedTable(BeamSqlRecordType beamSqlRecordType) {
+ super(beamSqlRecordType);
+ }
+
+ /**
+ * Convenient way to build a mocked bounded table.
+ *
+ * <p>e.g.
+ *
+ * <pre>{@code
+ * MockedUnboundedTable
+ * .of(Types.BIGINT, "order_id",
+ * Types.INTEGER, "site_id",
+ * Types.DOUBLE, "price",
+ * Types.TIMESTAMP, "order_time")
+ * }</pre>
+ */
+ public static MockedBoundedTable of(final Object... args){
+ return new MockedBoundedTable(buildBeamSqlRecordType(args));
+ }
+
+
+ /**
+ * Add rows to the builder.
+ *
+ * <p>Sample usage:
+ *
+ * <pre>{@code
+ * addRows(
+ * 1, 3, "james", -- first row
+ * 2, 5, "bond" -- second row
+ * ...
+ * )
+ * }</pre>
+ */
+ public MockedBoundedTable addRows(Object... args) {
+ List<BeamSqlRow> rows = buildRows(getRecordType(), args);
+ this.rows.addAll(rows);
+ return this;
+ }
+
+ @Override
+ public BeamIOType getSourceType() {
+ return BeamIOType.BOUNDED;
+ }
+
+ @Override
+ public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) {
+ return PBegin.in(pipeline).apply(
+ "MockedBoundedTable_Reader_" + COUNTER.incrementAndGet(), Create.of(rows));
+ }
+
+ @Override public PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter() {
+ return new OutputStore();
+ }
+
+ /**
+ * Keep output in {@code CONTENT} for validation.
+ *
+ */
+ public static class OutputStore extends PTransform<PCollection<BeamSqlRow>, PDone> {
+
+ @Override
+ public PDone expand(PCollection<BeamSqlRow> input) {
+ input.apply(ParDo.of(new DoFn<BeamSqlRow, Void>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ CONTENT.add(c.element());
+ }
+
+ @Teardown
+ public void close() {
+ CONTENT.clear();
+ }
+
+ }));
+ return PDone.in(input.getPipeline());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/bc66698e/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedTable.java
new file mode 100644
index 0000000..eed740a
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedTable.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.mock;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.beam.dsls.sql.schema.BaseBeamTable;
+import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+
+/**
+ * Base class for mocked table.
+ */
+public abstract class MockedTable extends BaseBeamTable {
+ public static final AtomicInteger COUNTER = new AtomicInteger();
+ public MockedTable(BeamSqlRecordType beamSqlRecordType) {
+ super(beamSqlRecordType);
+ }
+
+ @Override
+ public PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter() {
+ throw new UnsupportedOperationException("buildIOWriter unsupported!");
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/bc66698e/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedUnboundedTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedUnboundedTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedUnboundedTable.java
new file mode 100644
index 0000000..12d8d37
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedUnboundedTable.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.mock;
+
+import static org.apache.beam.dsls.sql.TestUtils.buildBeamSqlRecordType;
+import static org.apache.beam.dsls.sql.TestUtils.buildRows;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.dsls.sql.schema.BeamIOType;
+import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.calcite.util.Pair;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * A mocked unbounded table.
+ */
+public class MockedUnboundedTable extends MockedTable {
+ /** rows flow out from this table with the specified watermark instant. */
+ private final List<Pair<Duration, List<BeamSqlRow>>> timestampedRows = new ArrayList<>();
+ /** specify the index of column in the row which stands for the event time field. */
+ private int timestampField;
+ private MockedUnboundedTable(BeamSqlRecordType beamSqlRecordType) {
+ super(beamSqlRecordType);
+ }
+
+ /**
+ * Convenient way to build a mocked unbounded table.
+ *
+ * <p>e.g.
+ *
+ * <pre>{@code
+ * MockedUnboundedTable
+ * .of(Types.BIGINT, "order_id",
+ * Types.INTEGER, "site_id",
+ * Types.DOUBLE, "price",
+ * Types.TIMESTAMP, "order_time")
+ * }</pre>
+ */
+ public static MockedUnboundedTable of(final Object... args){
+ return new MockedUnboundedTable(buildBeamSqlRecordType(args));
+ }
+
+ public MockedUnboundedTable timestampColumnIndex(int idx) {
+ this.timestampField = idx;
+ return this;
+ }
+
+ /**
+ * Add rows to the builder.
+ *
+ * <p>Sample usage:
+ *
+ * <pre>{@code
+ * addRows(
+ * duration, -- duration which stands for the corresponding watermark instant
+ * 1, 3, "james", -- first row
+ * 2, 5, "bond" -- second row
+ * ...
+ * )
+ * }</pre>
+ */
+ public MockedUnboundedTable addRows(Duration duration, Object... args) {
+ List<BeamSqlRow> rows = buildRows(getRecordType(), args);
+ // record the watermark + rows
+ this.timestampedRows.add(Pair.of(duration, rows));
+ return this;
+ }
+
+ @Override public BeamIOType getSourceType() {
+ return BeamIOType.UNBOUNDED;
+ }
+
+ @Override public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) {
+ TestStream.Builder<BeamSqlRow> values = TestStream.create(
+ new BeamSqlRowCoder(beamSqlRecordType));
+
+ for (Pair<Duration, List<BeamSqlRow>> pair : timestampedRows) {
+ values = values.advanceWatermarkTo(new Instant(0).plus(pair.getKey()));
+ for (int i = 0; i < pair.getValue().size(); i++) {
+ values = values.addElements(TimestampedValue.of(pair.getValue().get(i),
+ new Instant(pair.getValue().get(i).getDate(timestampField))));
+ }
+ }
+
+ return pipeline.begin().apply(
+ "MockedUnboundedTable_" + COUNTER.incrementAndGet(),
+ values.advanceWatermarkToInfinity());
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/bc66698e/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java
deleted file mode 100644
index bb10369..0000000
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.planner;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.beam.dsls.sql.schema.BaseBeamTable;
-import org.apache.beam.dsls.sql.schema.BeamIOType;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.dsls.sql.utils.CalciteUtils;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rel.type.RelProtoDataType;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * Mocked table for bounded data sources.
- */
-public class MockedBeamSqlTable extends BaseBeamTable {
- private static final AtomicInteger COUNTER = new AtomicInteger();
- private static final ConcurrentLinkedQueue<BeamSqlRow> CONTENT = new ConcurrentLinkedQueue<>();
- private List<BeamSqlRow> inputRecords;
- public MockedBeamSqlTable(BeamSqlRecordType beamSqlRecordType) {
- super(beamSqlRecordType);
- }
-
- /**
- * Convenient way to build a mocked table with mock data:
- *
- * <p>e.g.
- *
- * <pre>{@code
- * MockedBeamSqlTable
- * .of(SqlTypeName.BIGINT, "order_id",
- * SqlTypeName.INTEGER, "site_id",
- * SqlTypeName.DOUBLE, "price",
- * SqlTypeName.TIMESTAMP, "order_time",
- *
- * 1L, 2, 1.0, new Date(),
- * 1L, 1, 2.0, new Date(),
- * 2L, 4, 3.0, new Date(),
- * 2L, 1, 4.0, new Date(),
- * 5L, 5, 5.0, new Date(),
- * 6L, 6, 6.0, new Date(),
- * 7L, 7, 7.0, new Date(),
- * 8L, 8888, 8.0, new Date(),
- * 8L, 999, 9.0, new Date(),
- * 10L, 100, 10.0, new Date())
- * }</pre>
- */
- // FIXME: refactor this method
- // 1) use Types rather than SqlTypeName
- // 2) use RowsBuilder rather than duplicate the logic here
- public static MockedBeamSqlTable of(final Object... args){
- final RelProtoDataType protoRowType = new RelProtoDataType() {
- @Override
- public RelDataType apply(RelDataTypeFactory a0) {
- RelDataTypeFactory.FieldInfoBuilder builder = a0.builder();
-
- int lastTypeIndex = 0;
- for (; lastTypeIndex < args.length; lastTypeIndex += 2) {
- if (args[lastTypeIndex] instanceof SqlTypeName) {
- builder.add(args[lastTypeIndex + 1].toString(),
- (SqlTypeName) args[lastTypeIndex]);
- } else {
- break;
- }
- }
- return builder.build();
- }
- };
-
- List<BeamSqlRow> rows = new ArrayList<>();
- BeamSqlRecordType beamSQLRecordType = CalciteUtils
- .toBeamRecordType(protoRowType.apply(BeamQueryPlanner.TYPE_FACTORY));
- int fieldCount = beamSQLRecordType.size();
-
- for (int i = fieldCount * 2; i < args.length; i += fieldCount) {
- BeamSqlRow row = new BeamSqlRow(beamSQLRecordType);
- for (int j = 0; j < fieldCount; j++) {
- row.addField(j, args[i + j]);
- }
- rows.add(row);
- }
- MockedBeamSqlTable table = new MockedBeamSqlTable(beamSQLRecordType);
- table.inputRecords = rows;
-
- return table;
- }
-
- @Override
- public BeamIOType getSourceType() {
- return BeamIOType.BOUNDED;
- }
-
- @Override
-
- public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) {
- return PBegin.in(pipeline).apply(
- "MockedBeamSQLTable_Reader_" + COUNTER.incrementAndGet(), Create.of(inputRecords));
- }
-
- @Override
- public PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter() {
- return new OutputStore();
- }
-
- public List<BeamSqlRow> getInputRecords() {
- return inputRecords;
- }
-
- /**
- * Keep output in {@code CONTENT} for validation.
- *
- */
- public static class OutputStore extends PTransform<PCollection<BeamSqlRow>, PDone> {
-
- @Override
- public PDone expand(PCollection<BeamSqlRow> input) {
- input.apply(ParDo.of(new DoFn<BeamSqlRow, Void>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- CONTENT.add(c.element());
- }
-
- @Teardown
- public void close() {
-
- }
-
- }));
- return PDone.in(input.getPipeline());
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/bc66698e/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedTable.java
deleted file mode 100644
index d096a61..0000000
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedTable.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql.planner;
-
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.beam.dsls.sql.schema.BaseBeamTable;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
-
-/**
- * Base class for mocked table.
- */
-public abstract class MockedTable extends BaseBeamTable {
- public static final AtomicInteger COUNTER = new AtomicInteger();
- public MockedTable(BeamSqlRecordType beamSqlRecordType) {
- super(beamSqlRecordType);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/bc66698e/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedUnboundedTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedUnboundedTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedUnboundedTable.java
deleted file mode 100644
index 3f22df3..0000000
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedUnboundedTable.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql.planner;
-
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.beam.dsls.sql.schema.BeamIOType;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.testing.TestStream;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.apache.calcite.util.Pair;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-/**
- * A mocked unbounded table.
- */
-public class MockedUnboundedTable extends MockedTable {
- private List<Pair<Duration, List<BeamSqlRow>>> timestampedRows = new ArrayList<>();
- private int timestampField;
- private MockedUnboundedTable(BeamSqlRecordType beamSqlRecordType) {
- super(beamSqlRecordType);
- }
-
- /**
- * Convenient way to build a mocked table.
- *
- * <p>e.g.
- *
- * <pre>{@code
- * MockedUnboundedTable
- * .of(Types.BIGINT, "order_id",
- * Types.INTEGER, "site_id",
- * Types.DOUBLE, "price",
- * Types.TIMESTAMP, "order_time")
- * }</pre>
- */
- public static MockedUnboundedTable of(final Object... args){
- List<Integer> types = new ArrayList<>();
- List<String> names = new ArrayList<>();
- int lastTypeIndex = 0;
- for (; lastTypeIndex < args.length; lastTypeIndex += 2) {
- types.add((int) args[lastTypeIndex]);
- names.add((String) args[lastTypeIndex + 1]);
- }
-
- return new MockedUnboundedTable(
- BeamSqlRecordType.create(names, types)
- );
- }
-
- public MockedUnboundedTable timestampColumnIndex(int idx) {
- this.timestampField = idx;
- return this;
- }
-
- public MockedUnboundedTable addRows(Duration duration, Object... args) {
- List<BeamSqlRow> rows = new ArrayList<>();
- int fieldCount = getRecordType().size();
-
- for (int i = 0; i < args.length; i += fieldCount) {
- BeamSqlRow row = new BeamSqlRow(getRecordType());
- for (int j = 0; j < fieldCount; j++) {
- row.addField(j, args[i + j]);
- }
- rows.add(row);
- }
-
- // record the watermark + rows
- this.timestampedRows.add(Pair.of(duration, rows));
- return this;
- }
-
- @Override public BeamIOType getSourceType() {
- return BeamIOType.UNBOUNDED;
- }
-
- @Override public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) {
- TestStream.Builder<BeamSqlRow> values = TestStream.create(
- new BeamSqlRowCoder(beamSqlRecordType));
-
- for (Pair<Duration, List<BeamSqlRow>> pair : timestampedRows) {
- values = values.advanceWatermarkTo(new Instant(0).plus(pair.getKey()));
- for (int i = 0; i < pair.getValue().size(); i++) {
- values = values.addElements(TimestampedValue.of(pair.getValue().get(i),
- new Instant(pair.getValue().get(i).getDate(timestampField))));
- }
- }
-
- return pipeline.begin().apply(
- "MockedUnboundedTable_" + COUNTER.incrementAndGet(),
- values.advanceWatermarkToInfinity());
- }
-
- @Override public PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter() {
- throw new UnsupportedOperationException("MockedUnboundedTable#buildIOWriter unsupported!");
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/bc66698e/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java
index 47fdc16..3b37143 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java
@@ -18,14 +18,15 @@
package org.apache.beam.dsls.sql.rel;
+import java.sql.Types;
import org.apache.beam.dsls.sql.BeamSqlCli;
import org.apache.beam.dsls.sql.BeamSqlEnv;
-import org.apache.beam.dsls.sql.planner.MockedBeamSqlTable;
+import org.apache.beam.dsls.sql.TestUtils;
+import org.apache.beam.dsls.sql.mock.MockedBoundedTable;
import org.apache.beam.dsls.sql.schema.BeamSqlRow;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.calcite.sql.type.SqlTypeName;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
@@ -38,29 +39,33 @@ public class BeamIntersectRelTest {
@Rule
public final TestPipeline pipeline = TestPipeline.create();
- private static MockedBeamSqlTable orderDetailsTable1 = MockedBeamSqlTable
- .of(SqlTypeName.BIGINT, "order_id",
- SqlTypeName.INTEGER, "site_id",
- SqlTypeName.DOUBLE, "price",
- 1L, 1, 1.0,
- 1L, 1, 1.0,
- 2L, 2, 2.0,
- 4L, 4, 4.0
- );
-
- private static MockedBeamSqlTable orderDetailsTable2 = MockedBeamSqlTable
- .of(SqlTypeName.BIGINT, "order_id",
- SqlTypeName.INTEGER, "site_id",
- SqlTypeName.DOUBLE, "price",
- 1L, 1, 1.0,
- 2L, 2, 2.0,
- 3L, 3, 3.0
- );
@BeforeClass
- public static void setUp() {
- sqlEnv.registerTable("ORDER_DETAILS1", orderDetailsTable1);
- sqlEnv.registerTable("ORDER_DETAILS2", orderDetailsTable2);
+ public static void prepare() {
+ sqlEnv.registerTable("ORDER_DETAILS1",
+ MockedBoundedTable.of(
+ Types.BIGINT, "order_id",
+ Types.INTEGER, "site_id",
+ Types.DOUBLE, "price"
+ ).addRows(
+ 1L, 1, 1.0,
+ 1L, 1, 1.0,
+ 2L, 2, 2.0,
+ 4L, 4, 4.0
+ )
+ );
+
+ sqlEnv.registerTable("ORDER_DETAILS2",
+ MockedBoundedTable.of(
+ Types.BIGINT, "order_id",
+ Types.INTEGER, "site_id",
+ Types.DOUBLE, "price"
+ ).addRows(
+ 1L, 1, 1.0,
+ 2L, 2, 2.0,
+ 3L, 3, 3.0
+ )
+ );
}
@Test
@@ -74,14 +79,14 @@ public class BeamIntersectRelTest {
PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
PAssert.that(rows).containsInAnyOrder(
- MockedBeamSqlTable.of(
- SqlTypeName.BIGINT, "order_id",
- SqlTypeName.INTEGER, "site_id",
- SqlTypeName.DOUBLE, "price",
-
- 1L, 1, 1.0,
- 2L, 2, 2.0
- ).getInputRecords());
+ TestUtils.RowsBuilder.of(
+ Types.BIGINT, "order_id",
+ Types.INTEGER, "site_id",
+ Types.DOUBLE, "price"
+ ).addRows(
+ 1L, 1, 1.0,
+ 2L, 2, 2.0
+ ).getRows());
pipeline.run().waitUntilFinish();
}
@@ -99,14 +104,15 @@ public class BeamIntersectRelTest {
PAssert.that(rows).satisfies(new CheckSize(3));
PAssert.that(rows).containsInAnyOrder(
- MockedBeamSqlTable.of(
- SqlTypeName.BIGINT, "order_id",
- SqlTypeName.INTEGER, "site_id",
- SqlTypeName.DOUBLE, "price",
+ TestUtils.RowsBuilder.of(
+ Types.BIGINT, "order_id",
+ Types.INTEGER, "site_id",
+ Types.DOUBLE, "price"
+ ).addRows(
1L, 1, 1.0,
1L, 1, 1.0,
2L, 2, 2.0
- ).getInputRecords());
+ ).getRows());
pipeline.run();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/bc66698e/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelBoundedVsBoundedTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelBoundedVsBoundedTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelBoundedVsBoundedTest.java
index 505b742..d15cb81 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelBoundedVsBoundedTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelBoundedVsBoundedTest.java
@@ -18,14 +18,15 @@
package org.apache.beam.dsls.sql.rel;
+import java.sql.Types;
import org.apache.beam.dsls.sql.BeamSqlCli;
import org.apache.beam.dsls.sql.BeamSqlEnv;
-import org.apache.beam.dsls.sql.planner.MockedBeamSqlTable;
+import org.apache.beam.dsls.sql.TestUtils;
+import org.apache.beam.dsls.sql.mock.MockedBoundedTable;
import org.apache.beam.dsls.sql.schema.BeamSqlRow;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.calcite.sql.type.SqlTypeName;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
@@ -41,24 +42,28 @@ public class BeamJoinRelBoundedVsBoundedTest {
@BeforeClass
public static void prepare() {
beamSqlEnv.registerTable("ORDER_DETAILS",
- MockedBeamSqlTable
- .of(SqlTypeName.INTEGER, "order_id",
- SqlTypeName.INTEGER, "site_id",
- SqlTypeName.INTEGER, "price",
-
- 1, 2, 3,
- 2, 3, 3,
- 3, 4, 5));
+ MockedBoundedTable.of(
+ Types.INTEGER, "order_id",
+ Types.INTEGER, "site_id",
+ Types.INTEGER, "price"
+ ).addRows(
+ 1, 2, 3,
+ 2, 3, 3,
+ 3, 4, 5
+ )
+ );
beamSqlEnv.registerTable("ORDER_DETAILS0",
- MockedBeamSqlTable
- .of(SqlTypeName.INTEGER, "order_id0",
- SqlTypeName.INTEGER, "site_id0",
- SqlTypeName.INTEGER, "price0",
-
- 1, 2, 3,
- 2, 3, 3,
- 3, 4, 5));
+ MockedBoundedTable.of(
+ Types.INTEGER, "order_id0",
+ Types.INTEGER, "site_id0",
+ Types.INTEGER, "price0"
+ ).addRows(
+ 1, 2, 3,
+ 2, 3, 3,
+ 3, 4, 5
+ )
+ );
}
@@ -73,16 +78,17 @@ public class BeamJoinRelBoundedVsBoundedTest {
;
PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
- PAssert.that(rows).containsInAnyOrder(MockedBeamSqlTable.of(
- SqlTypeName.INTEGER, "order_id",
- SqlTypeName.INTEGER, "site_id",
- SqlTypeName.INTEGER, "price",
- SqlTypeName.INTEGER, "order_id0",
- SqlTypeName.INTEGER, "site_id0",
- SqlTypeName.INTEGER, "price0",
-
- 2, 3, 3, 1, 2, 3
- ).getInputRecords());
+ PAssert.that(rows).containsInAnyOrder(
+ TestUtils.RowsBuilder.of(
+ Types.INTEGER, "order_id",
+ Types.INTEGER, "site_id",
+ Types.INTEGER, "price",
+ Types.INTEGER, "order_id0",
+ Types.INTEGER, "site_id0",
+ Types.INTEGER, "price0"
+ ).addRows(
+ 2, 3, 3, 1, 2, 3
+ ).getRows());
pipeline.run();
}
@@ -98,18 +104,19 @@ public class BeamJoinRelBoundedVsBoundedTest {
PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
pipeline.enableAbandonedNodeEnforcement(false);
- PAssert.that(rows).containsInAnyOrder(MockedBeamSqlTable.of(
- SqlTypeName.INTEGER, "order_id",
- SqlTypeName.INTEGER, "site_id",
- SqlTypeName.INTEGER, "price",
- SqlTypeName.INTEGER, "order_id0",
- SqlTypeName.INTEGER, "site_id0",
- SqlTypeName.INTEGER, "price0",
-
- 1, 2, 3, null, null, null,
- 2, 3, 3, 1, 2, 3,
- 3, 4, 5, null, null, null
- ).getInputRecords());
+ PAssert.that(rows).containsInAnyOrder(
+ TestUtils.RowsBuilder.of(
+ Types.INTEGER, "order_id",
+ Types.INTEGER, "site_id",
+ Types.INTEGER, "price",
+ Types.INTEGER, "order_id0",
+ Types.INTEGER, "site_id0",
+ Types.INTEGER, "price0"
+ ).addRows(
+ 1, 2, 3, null, null, null,
+ 2, 3, 3, 1, 2, 3,
+ 3, 4, 5, null, null, null
+ ).getRows());
pipeline.run();
}
@@ -124,18 +131,19 @@ public class BeamJoinRelBoundedVsBoundedTest {
;
PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
- PAssert.that(rows).containsInAnyOrder(MockedBeamSqlTable.of(
- SqlTypeName.INTEGER, "order_id",
- SqlTypeName.INTEGER, "site_id",
- SqlTypeName.INTEGER, "price",
- SqlTypeName.INTEGER, "order_id0",
- SqlTypeName.INTEGER, "site_id0",
- SqlTypeName.INTEGER, "price0",
-
- 2, 3, 3, 1, 2, 3,
- null, null, null, 2, 3, 3,
- null, null, null, 3, 4, 5
- ).getInputRecords());
+ PAssert.that(rows).containsInAnyOrder(
+ TestUtils.RowsBuilder.of(
+ Types.INTEGER, "order_id",
+ Types.INTEGER, "site_id",
+ Types.INTEGER, "price",
+ Types.INTEGER, "order_id0",
+ Types.INTEGER, "site_id0",
+ Types.INTEGER, "price0"
+ ).addRows(
+ 2, 3, 3, 1, 2, 3,
+ null, null, null, 2, 3, 3,
+ null, null, null, 3, 4, 5
+ ).getRows());
pipeline.run();
}
@@ -150,20 +158,21 @@ public class BeamJoinRelBoundedVsBoundedTest {
;
PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
- PAssert.that(rows).containsInAnyOrder(MockedBeamSqlTable.of(
- SqlTypeName.INTEGER, "order_id",
- SqlTypeName.INTEGER, "site_id",
- SqlTypeName.INTEGER, "price",
- SqlTypeName.INTEGER, "order_id0",
- SqlTypeName.INTEGER, "site_id0",
- SqlTypeName.INTEGER, "price0",
-
- 2, 3, 3, 1, 2, 3,
- 1, 2, 3, null, null, null,
- 3, 4, 5, null, null, null,
- null, null, null, 2, 3, 3,
- null, null, null, 3, 4, 5
- ).getInputRecords());
+ PAssert.that(rows).containsInAnyOrder(
+ TestUtils.RowsBuilder.of(
+ Types.INTEGER, "order_id",
+ Types.INTEGER, "site_id",
+ Types.INTEGER, "price",
+ Types.INTEGER, "order_id0",
+ Types.INTEGER, "site_id0",
+ Types.INTEGER, "price0"
+ ).addRows(
+ 2, 3, 3, 1, 2, 3,
+ 1, 2, 3, null, null, null,
+ 3, 4, 5, null, null, null,
+ null, null, null, 2, 3, 3,
+ null, null, null, 3, 4, 5
+ ).getRows());
pipeline.run();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/bc66698e/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java
index 2ddb00b..3f0c98e 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java
@@ -23,15 +23,14 @@ import java.util.Date;
import org.apache.beam.dsls.sql.BeamSqlCli;
import org.apache.beam.dsls.sql.BeamSqlEnv;
import org.apache.beam.dsls.sql.TestUtils;
-import org.apache.beam.dsls.sql.planner.MockedBeamSqlTable;
-import org.apache.beam.dsls.sql.planner.MockedUnboundedTable;
+import org.apache.beam.dsls.sql.mock.MockedBoundedTable;
+import org.apache.beam.dsls.sql.mock.MockedUnboundedTable;
import org.apache.beam.dsls.sql.schema.BeamSqlRow;
import org.apache.beam.dsls.sql.transform.BeamSqlOutputToConsoleFn;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.calcite.sql.type.SqlTypeName;
import org.joda.time.Duration;
import org.junit.BeforeClass;
import org.junit.Rule;
@@ -79,10 +78,10 @@ public class BeamJoinRelUnboundedVsBoundedTest {
)
);
- beamSqlEnv.registerTable("ORDER_DETAILS1", MockedBeamSqlTable
- .of(SqlTypeName.INTEGER, "order_id",
- SqlTypeName.VARCHAR, "buyer",
-
+ beamSqlEnv.registerTable("ORDER_DETAILS1", MockedBoundedTable
+ .of(Types.INTEGER, "order_id",
+ Types.VARCHAR, "buyer"
+ ).addRows(
1, "james",
2, "bond"
));
@@ -106,7 +105,7 @@ public class BeamJoinRelUnboundedVsBoundedTest {
Types.INTEGER, "order_id",
Types.INTEGER, "sum_site_id",
Types.VARCHAR, "buyer"
- ).values(
+ ).addRows(
1, 3, "james",
2, 5, "bond"
).getStringRows()
@@ -132,7 +131,7 @@ public class BeamJoinRelUnboundedVsBoundedTest {
Types.INTEGER, "order_id",
Types.INTEGER, "sum_site_id",
Types.VARCHAR, "buyer"
- ).values(
+ ).addRows(
1, 3, "james",
2, 5, "bond"
).getStringRows()
@@ -159,7 +158,7 @@ public class BeamJoinRelUnboundedVsBoundedTest {
Types.INTEGER, "order_id",
Types.INTEGER, "sum_site_id",
Types.VARCHAR, "buyer"
- ).values(
+ ).addRows(
1, 3, "james",
2, 5, "bond",
3, 3, null
@@ -200,7 +199,7 @@ public class BeamJoinRelUnboundedVsBoundedTest {
Types.INTEGER, "order_id",
Types.INTEGER, "sum_site_id",
Types.VARCHAR, "buyer"
- ).values(
+ ).addRows(
1, 3, "james",
2, 5, "bond",
3, 3, null
http://git-wip-us.apache.org/repos/asf/beam/blob/bc66698e/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java
index 18a5f60..d76e875 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java
@@ -23,7 +23,7 @@ import java.util.Date;
import org.apache.beam.dsls.sql.BeamSqlCli;
import org.apache.beam.dsls.sql.BeamSqlEnv;
import org.apache.beam.dsls.sql.TestUtils;
-import org.apache.beam.dsls.sql.planner.MockedUnboundedTable;
+import org.apache.beam.dsls.sql.mock.MockedUnboundedTable;
import org.apache.beam.dsls.sql.schema.BeamSqlRow;
import org.apache.beam.dsls.sql.transform.BeamSqlOutputToConsoleFn;
import org.apache.beam.sdk.testing.PAssert;
@@ -95,7 +95,7 @@ public class BeamJoinRelUnboundedVsUnboundedTest {
Types.INTEGER, "order_id",
Types.INTEGER, "sum_site_id",
Types.INTEGER, "order_id0",
- Types.INTEGER, "sum_site_id0").values(
+ Types.INTEGER, "sum_site_id0").addRows(
1, 3, 1, 3,
2, 5, 2, 5
).getStringRows()
@@ -129,7 +129,7 @@ public class BeamJoinRelUnboundedVsUnboundedTest {
Types.INTEGER, "sum_site_id",
Types.INTEGER, "order_id0",
Types.INTEGER, "sum_site_id0"
- ).values(
+ ).addRows(
1, 1, 1, 3,
2, 2, null, null,
2, 2, 2, 5,
@@ -159,7 +159,7 @@ public class BeamJoinRelUnboundedVsUnboundedTest {
Types.INTEGER, "sum_site_id",
Types.INTEGER, "order_id0",
Types.INTEGER, "sum_site_id0"
- ).values(
+ ).addRows(
1, 3, 1, 1,
null, null, 2, 2,
2, 5, 2, 2,
@@ -190,7 +190,7 @@ public class BeamJoinRelUnboundedVsUnboundedTest {
Types.INTEGER, "sum_site_id",
Types.INTEGER, "order_id",
Types.INTEGER, "sum_site_id0"
- ).values(
+ ).addRows(
1, 1, 1, 3,
6, 2, null, null,
7, 2, null, null,
http://git-wip-us.apache.org/repos/asf/beam/blob/bc66698e/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java
index bb5e7ee..80da8fb 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java
@@ -18,15 +18,16 @@
package org.apache.beam.dsls.sql.rel;
+import java.sql.Types;
import org.apache.beam.dsls.sql.BeamSqlCli;
import org.apache.beam.dsls.sql.BeamSqlEnv;
-import org.apache.beam.dsls.sql.planner.MockedBeamSqlTable;
+import org.apache.beam.dsls.sql.TestUtils;
+import org.apache.beam.dsls.sql.mock.MockedBoundedTable;
import org.apache.beam.dsls.sql.schema.BeamSqlRow;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
@@ -38,30 +39,34 @@ public class BeamMinusRelTest {
@Rule
public final TestPipeline pipeline = TestPipeline.create();
- private MockedBeamSqlTable orderDetailsTable1 = MockedBeamSqlTable
- .of(SqlTypeName.BIGINT, "order_id",
- SqlTypeName.INTEGER, "site_id",
- SqlTypeName.DOUBLE, "price",
- 1L, 1, 1.0,
- 1L, 1, 1.0,
- 2L, 2, 2.0,
- 4L, 4, 4.0,
- 4L, 4, 4.0
- );
- private MockedBeamSqlTable orderDetailsTable2 = MockedBeamSqlTable
- .of(SqlTypeName.BIGINT, "order_id",
- SqlTypeName.INTEGER, "site_id",
- SqlTypeName.DOUBLE, "price",
- 1L, 1, 1.0,
- 2L, 2, 2.0,
- 3L, 3, 3.0
- );
+ @BeforeClass
+ public static void prepare() {
+ sqlEnv.registerTable("ORDER_DETAILS1",
+ MockedBoundedTable.of(
+ Types.BIGINT, "order_id",
+ Types.INTEGER, "site_id",
+ Types.DOUBLE, "price"
+ ).addRows(
+ 1L, 1, 1.0,
+ 1L, 1, 1.0,
+ 2L, 2, 2.0,
+ 4L, 4, 4.0,
+ 4L, 4, 4.0
+ )
+ );
- @Before
- public void setUp() {
- sqlEnv.registerTable("ORDER_DETAILS1", orderDetailsTable1);
- sqlEnv.registerTable("ORDER_DETAILS2", orderDetailsTable2);
+ sqlEnv.registerTable("ORDER_DETAILS2",
+ MockedBoundedTable.of(
+ Types.BIGINT, "order_id",
+ Types.INTEGER, "site_id",
+ Types.DOUBLE, "price"
+ ).addRows(
+ 1L, 1, 1.0,
+ 2L, 2, 2.0,
+ 3L, 3, 3.0
+ )
+ );
}
@Test
@@ -75,12 +80,13 @@ public class BeamMinusRelTest {
PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
PAssert.that(rows).containsInAnyOrder(
- MockedBeamSqlTable.of(
- SqlTypeName.BIGINT, "order_id",
- SqlTypeName.INTEGER, "site_id",
- SqlTypeName.DOUBLE, "price",
+ TestUtils.RowsBuilder.of(
+ Types.BIGINT, "order_id",
+ Types.INTEGER, "site_id",
+ Types.DOUBLE, "price"
+ ).addRows(
4L, 4, 4.0
- ).getInputRecords());
+ ).getRows());
pipeline.run();
}
@@ -98,13 +104,14 @@ public class BeamMinusRelTest {
PAssert.that(rows).satisfies(new CheckSize(2));
PAssert.that(rows).containsInAnyOrder(
- MockedBeamSqlTable.of(
- SqlTypeName.BIGINT, "order_id",
- SqlTypeName.INTEGER, "site_id",
- SqlTypeName.DOUBLE, "price",
+ TestUtils.RowsBuilder.of(
+ Types.BIGINT, "order_id",
+ Types.INTEGER, "site_id",
+ Types.DOUBLE, "price"
+ ).addRows(
4L, 4, 4.0,
4L, 4, 4.0
- ).getInputRecords());
+ ).getRows());
pipeline.run();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/bc66698e/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java
index f10a767..d0b01df 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java
@@ -18,22 +18,19 @@
package org.apache.beam.dsls.sql.rel;
-import java.util.ArrayList;
+import java.sql.Types;
import java.util.Date;
-import java.util.List;
-
import org.apache.beam.dsls.sql.BeamSqlCli;
import org.apache.beam.dsls.sql.BeamSqlEnv;
-import org.apache.beam.dsls.sql.planner.MockedBeamSqlTable;
+import org.apache.beam.dsls.sql.TestUtils;
+import org.apache.beam.dsls.sql.mock.MockedBoundedTable;
import org.apache.beam.dsls.sql.schema.BeamSqlRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.calcite.sql.type.SqlTypeName;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
@@ -46,20 +43,21 @@ public class BeamSetOperatorRelBaseTest {
@Rule
public final TestPipeline pipeline = TestPipeline.create();
- public static final Date THE_DATE = new Date();
- private static MockedBeamSqlTable orderDetailsTable = MockedBeamSqlTable
- .of(SqlTypeName.BIGINT, "order_id",
- SqlTypeName.INTEGER, "site_id",
- SqlTypeName.DOUBLE, "price",
- SqlTypeName.TIMESTAMP, "order_time",
-
- 1L, 1, 1.0, THE_DATE,
- 2L, 2, 2.0, THE_DATE);
+ public static final Date THE_DATE = new Date(100000);
@BeforeClass
public static void prepare() {
- THE_DATE.setTime(100000);
- sqlEnv.registerTable("ORDER_DETAILS", orderDetailsTable);
+ sqlEnv.registerTable("ORDER_DETAILS",
+ MockedBoundedTable.of(
+ Types.BIGINT, "order_id",
+ Types.INTEGER, "site_id",
+ Types.DOUBLE, "price",
+ Types.TIMESTAMP, "order_time"
+ ).addRows(
+ 1L, 1, 1.0, THE_DATE,
+ 2L, 2, 2.0, THE_DATE
+ )
+ );
}
@Test
@@ -74,17 +72,17 @@ public class BeamSetOperatorRelBaseTest {
+ ", TUMBLE(order_time, INTERVAL '1' HOUR) ";
PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
- List<BeamSqlRow> expRows =
- MockedBeamSqlTable.of(
- SqlTypeName.BIGINT, "order_id",
- SqlTypeName.INTEGER, "site_id",
- SqlTypeName.BIGINT, "cnt",
-
- 1L, 1, 1L,
- 2L, 2, 1L
- ).getInputRecords();
// compare valueInString to ignore the windowStart & windowEnd
- PAssert.that(rows.apply(ParDo.of(new ToString()))).containsInAnyOrder(toString(expRows));
+ PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
+ .containsInAnyOrder(
+ TestUtils.RowsBuilder.of(
+ Types.BIGINT, "order_id",
+ Types.INTEGER, "site_id",
+ Types.BIGINT, "cnt"
+ ).addRows(
+ 1L, 1, 1L,
+ 2L, 2, 1L
+ ).getStringRows());
pipeline.run();
}
@@ -105,20 +103,4 @@ public class BeamSetOperatorRelBaseTest {
BeamSqlCli.compilePipeline(sql, pipeline1, sqlEnv);
pipeline.run();
}
-
- static class ToString extends DoFn<BeamSqlRow, String> {
- @ProcessElement
- public void processElement(ProcessContext ctx) {
- ctx.output(ctx.element().valueInString());
- }
- }
-
- static List<String> toString (List<BeamSqlRow> rows) {
- List<String> strs = new ArrayList<>();
- for (BeamSqlRow row : rows) {
- strs.add(row.valueInString());
- }
-
- return strs;
- }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/bc66698e/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java
index d5c18fc..1067926 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java
@@ -18,15 +18,16 @@
package org.apache.beam.dsls.sql.rel;
+import java.sql.Types;
import java.util.Date;
import org.apache.beam.dsls.sql.BeamSqlCli;
import org.apache.beam.dsls.sql.BeamSqlEnv;
-import org.apache.beam.dsls.sql.planner.MockedBeamSqlTable;
+import org.apache.beam.dsls.sql.TestUtils;
+import org.apache.beam.dsls.sql.mock.MockedBoundedTable;
import org.apache.beam.dsls.sql.schema.BeamSqlRow;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.calcite.sql.type.SqlTypeName;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -40,27 +41,35 @@ public class BeamSortRelTest {
@Rule
public final TestPipeline pipeline = TestPipeline.create();
- private static MockedBeamSqlTable subOrderRamTable = MockedBeamSqlTable.of(
- SqlTypeName.BIGINT, "order_id",
- SqlTypeName.INTEGER, "site_id",
- SqlTypeName.DOUBLE, "price");
-
- private static MockedBeamSqlTable orderDetailTable = MockedBeamSqlTable
- .of(SqlTypeName.BIGINT, "order_id",
- SqlTypeName.INTEGER, "site_id",
- SqlTypeName.DOUBLE, "price",
- SqlTypeName.TIMESTAMP, "order_time",
-
- 1L, 2, 1.0, new Date(),
- 1L, 1, 2.0, new Date(),
- 2L, 4, 3.0, new Date(),
- 2L, 1, 4.0, new Date(),
- 5L, 5, 5.0, new Date(),
- 6L, 6, 6.0, new Date(),
- 7L, 7, 7.0, new Date(),
- 8L, 8888, 8.0, new Date(),
- 8L, 999, 9.0, new Date(),
- 10L, 100, 10.0, new Date());
+ @Before
+ public void prepare() {
+ sqlEnv.registerTable("ORDER_DETAILS",
+ MockedBoundedTable.of(
+ Types.BIGINT, "order_id",
+ Types.INTEGER, "site_id",
+ Types.DOUBLE, "price",
+ Types.TIMESTAMP, "order_time"
+ ).addRows(
+ 1L, 2, 1.0, new Date(),
+ 1L, 1, 2.0, new Date(),
+ 2L, 4, 3.0, new Date(),
+ 2L, 1, 4.0, new Date(),
+ 5L, 5, 5.0, new Date(),
+ 6L, 6, 6.0, new Date(),
+ 7L, 7, 7.0, new Date(),
+ 8L, 8888, 8.0, new Date(),
+ 8L, 999, 9.0, new Date(),
+ 10L, 100, 10.0, new Date()
+ )
+ );
+ sqlEnv.registerTable("SUB_ORDER_RAM",
+ MockedBoundedTable.of(
+ Types.BIGINT, "order_id",
+ Types.INTEGER, "site_id",
+ Types.DOUBLE, "price"
+ )
+ );
+ }
@Test
public void testOrderBy_basic() throws Exception {
@@ -70,34 +79,38 @@ public class BeamSortRelTest {
+ "ORDER BY order_id asc, site_id desc limit 4";
PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
- PAssert.that(rows).containsInAnyOrder(MockedBeamSqlTable.of(
- SqlTypeName.BIGINT, "order_id",
- SqlTypeName.INTEGER, "site_id",
- SqlTypeName.DOUBLE, "price",
+ PAssert.that(rows).containsInAnyOrder(TestUtils.RowsBuilder.of(
+ Types.BIGINT, "order_id",
+ Types.INTEGER, "site_id",
+ Types.DOUBLE, "price"
+ ).addRows(
1L, 2, 1.0,
1L, 1, 2.0,
2L, 4, 3.0,
2L, 1, 4.0
- ).getInputRecords());
+ ).getRows());
pipeline.run().waitUntilFinish();
}
@Test
public void testOrderBy_nullsFirst() throws Exception {
- sqlEnv.registerTable("ORDER_DETAILS", MockedBeamSqlTable
- .of(SqlTypeName.BIGINT, "order_id",
- SqlTypeName.INTEGER, "site_id",
- SqlTypeName.DOUBLE, "price",
-
+ sqlEnv.registerTable("ORDER_DETAILS",
+ MockedBoundedTable.of(
+ Types.BIGINT, "order_id",
+ Types.INTEGER, "site_id",
+ Types.DOUBLE, "price"
+ ).addRows(
1L, 2, 1.0,
1L, null, 2.0,
2L, 1, 3.0,
2L, null, 4.0,
- 5L, 5, 5.0));
- sqlEnv.registerTable("SUB_ORDER_RAM", MockedBeamSqlTable
- .of(SqlTypeName.BIGINT, "order_id",
- SqlTypeName.INTEGER, "site_id",
- SqlTypeName.DOUBLE, "price"));
+ 5L, 5, 5.0
+ )
+ );
+ sqlEnv.registerTable("SUB_ORDER_RAM", MockedBoundedTable
+ .of(Types.BIGINT, "order_id",
+ Types.INTEGER, "site_id",
+ Types.DOUBLE, "price"));
String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT "
+ " order_id, site_id, price "
@@ -106,36 +119,36 @@ public class BeamSortRelTest {
PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
PAssert.that(rows).containsInAnyOrder(
- MockedBeamSqlTable.of(
- SqlTypeName.BIGINT, "order_id",
- SqlTypeName.INTEGER, "site_id",
- SqlTypeName.DOUBLE, "price",
-
+ TestUtils.RowsBuilder.of(
+ Types.BIGINT, "order_id",
+ Types.INTEGER, "site_id",
+ Types.DOUBLE, "price"
+ ).addRows(
1L, null, 2.0,
1L, 2, 1.0,
2L, null, 4.0,
2L, 1, 3.0
- ).getInputRecords()
+ ).getRows()
);
pipeline.run().waitUntilFinish();
}
@Test
public void testOrderBy_nullsLast() throws Exception {
- sqlEnv.registerTable("ORDER_DETAILS", MockedBeamSqlTable
- .of(SqlTypeName.BIGINT, "order_id",
- SqlTypeName.INTEGER, "site_id",
- SqlTypeName.DOUBLE, "price",
-
+ sqlEnv.registerTable("ORDER_DETAILS", MockedBoundedTable
+ .of(Types.BIGINT, "order_id",
+ Types.INTEGER, "site_id",
+ Types.DOUBLE, "price"
+ ).addRows(
1L, 2, 1.0,
1L, null, 2.0,
2L, 1, 3.0,
2L, null, 4.0,
5L, 5, 5.0));
- sqlEnv.registerTable("SUB_ORDER_RAM", MockedBeamSqlTable
- .of(SqlTypeName.BIGINT, "order_id",
- SqlTypeName.INTEGER, "site_id",
- SqlTypeName.DOUBLE, "price"));
+ sqlEnv.registerTable("SUB_ORDER_RAM", MockedBoundedTable
+ .of(Types.BIGINT, "order_id",
+ Types.INTEGER, "site_id",
+ Types.DOUBLE, "price"));
String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT "
+ " order_id, site_id, price "
@@ -144,16 +157,16 @@ public class BeamSortRelTest {
PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
PAssert.that(rows).containsInAnyOrder(
- MockedBeamSqlTable.of(
- SqlTypeName.BIGINT, "order_id",
- SqlTypeName.INTEGER, "site_id",
- SqlTypeName.DOUBLE, "price",
-
+ TestUtils.RowsBuilder.of(
+ Types.BIGINT, "order_id",
+ Types.INTEGER, "site_id",
+ Types.DOUBLE, "price"
+ ).addRows(
1L, 2, 1.0,
1L, null, 2.0,
2L, 1, 3.0,
2L, null, 4.0
- ).getInputRecords()
+ ).getRows()
);
pipeline.run().waitUntilFinish();
}
@@ -167,16 +180,16 @@ public class BeamSortRelTest {
PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
PAssert.that(rows).containsInAnyOrder(
- MockedBeamSqlTable.of(
- SqlTypeName.BIGINT, "order_id",
- SqlTypeName.INTEGER, "site_id",
- SqlTypeName.DOUBLE, "price",
-
+ TestUtils.RowsBuilder.of(
+ Types.BIGINT, "order_id",
+ Types.INTEGER, "site_id",
+ Types.DOUBLE, "price"
+ ).addRows(
5L, 5, 5.0,
6L, 6, 6.0,
7L, 7, 7.0,
8L, 8888, 8.0
- ).getInputRecords()
+ ).getRows()
);
pipeline.run().waitUntilFinish();
}
@@ -190,11 +203,11 @@ public class BeamSortRelTest {
PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
PAssert.that(rows).containsInAnyOrder(
- MockedBeamSqlTable.of(
- SqlTypeName.BIGINT, "order_id",
- SqlTypeName.INTEGER, "site_id",
- SqlTypeName.DOUBLE, "price",
-
+ TestUtils.RowsBuilder.of(
+ Types.BIGINT, "order_id",
+ Types.INTEGER, "site_id",
+ Types.DOUBLE, "price"
+ ).addRows(
1L, 2, 1.0,
1L, 1, 2.0,
2L, 4, 3.0,
@@ -205,7 +218,7 @@ public class BeamSortRelTest {
8L, 8888, 8.0,
8L, 999, 9.0,
10L, 100, 10.0
- ).getInputRecords()
+ ).getRows()
);
pipeline.run().waitUntilFinish();
}
@@ -221,10 +234,4 @@ public class BeamSortRelTest {
TestPipeline pipeline = TestPipeline.create();
BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
}
-
- @Before
- public void prepare() {
- sqlEnv.registerTable("ORDER_DETAILS", orderDetailTable);
- sqlEnv.registerTable("SUB_ORDER_RAM", subOrderRamTable);
- }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/bc66698e/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java
index c5aa132..cad3290 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java
@@ -18,14 +18,15 @@
package org.apache.beam.dsls.sql.rel;
+import java.sql.Types;
import org.apache.beam.dsls.sql.BeamSqlCli;
import org.apache.beam.dsls.sql.BeamSqlEnv;
-import org.apache.beam.dsls.sql.planner.MockedBeamSqlTable;
+import org.apache.beam.dsls.sql.TestUtils;
+import org.apache.beam.dsls.sql.mock.MockedBoundedTable;
import org.apache.beam.dsls.sql.schema.BeamSqlRow;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.calcite.sql.type.SqlTypeName;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
@@ -38,17 +39,19 @@ public class BeamUnionRelTest {
@Rule
public final TestPipeline pipeline = TestPipeline.create();
- private static MockedBeamSqlTable orderDetailsTable = MockedBeamSqlTable
- .of(SqlTypeName.BIGINT, "order_id",
- SqlTypeName.INTEGER, "site_id",
- SqlTypeName.DOUBLE, "price",
-
- 1L, 1, 1.0,
- 2L, 2, 2.0);
@BeforeClass
public static void prepare() {
- sqlEnv.registerTable("ORDER_DETAILS", orderDetailsTable);
+ sqlEnv.registerTable("ORDER_DETAILS",
+ MockedBoundedTable.of(
+ Types.BIGINT, "order_id",
+ Types.INTEGER, "site_id",
+ Types.DOUBLE, "price"
+ ).addRows(
+ 1L, 1, 1.0,
+ 2L, 2, 2.0
+ )
+ );
}
@Test
@@ -62,14 +65,14 @@ public class BeamUnionRelTest {
PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
PAssert.that(rows).containsInAnyOrder(
- MockedBeamSqlTable.of(
- SqlTypeName.BIGINT, "order_id",
- SqlTypeName.INTEGER, "site_id",
- SqlTypeName.DOUBLE, "price",
-
+ TestUtils.RowsBuilder.of(
+ Types.BIGINT, "order_id",
+ Types.INTEGER, "site_id",
+ Types.DOUBLE, "price"
+ ).addRows(
1L, 1, 1.0,
2L, 2, 2.0
- ).getInputRecords()
+ ).getRows()
);
pipeline.run();
}
@@ -85,16 +88,16 @@ public class BeamUnionRelTest {
PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
PAssert.that(rows).containsInAnyOrder(
- MockedBeamSqlTable.of(
- SqlTypeName.BIGINT, "order_id",
- SqlTypeName.INTEGER, "site_id",
- SqlTypeName.DOUBLE, "price",
-
+ TestUtils.RowsBuilder.of(
+ Types.BIGINT, "order_id",
+ Types.INTEGER, "site_id",
+ Types.DOUBLE, "price"
+ ).addRows(
1L, 1, 1.0,
1L, 1, 1.0,
2L, 2, 2.0,
2L, 2, 2.0
- ).getInputRecords()
+ ).getRows()
);
pipeline.run();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/bc66698e/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java
index 81b1a13..9d13f9b 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java
@@ -18,14 +18,15 @@
package org.apache.beam.dsls.sql.rel;
+import java.sql.Types;
import org.apache.beam.dsls.sql.BeamSqlCli;
import org.apache.beam.dsls.sql.BeamSqlEnv;
-import org.apache.beam.dsls.sql.planner.MockedBeamSqlTable;
+import org.apache.beam.dsls.sql.TestUtils;
+import org.apache.beam.dsls.sql.mock.MockedBoundedTable;
import org.apache.beam.dsls.sql.schema.BeamSqlRow;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.calcite.sql.type.SqlTypeName;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
@@ -38,24 +39,37 @@ public class BeamValuesRelTest {
@Rule
public final TestPipeline pipeline = TestPipeline.create();
- private static MockedBeamSqlTable stringTable = MockedBeamSqlTable
- .of(SqlTypeName.VARCHAR, "name",
- SqlTypeName.VARCHAR, "description");
- private static MockedBeamSqlTable intTable = MockedBeamSqlTable
- .of(SqlTypeName.INTEGER, "c0",
- SqlTypeName.INTEGER, "c1");
+ @BeforeClass
+ public static void prepare() {
+ sqlEnv.registerTable("string_table",
+ MockedBoundedTable.of(
+ Types.VARCHAR, "name",
+ Types.VARCHAR, "description"
+ )
+ );
+ sqlEnv.registerTable("int_table",
+ MockedBoundedTable.of(
+ Types.INTEGER, "c0",
+ Types.INTEGER, "c1"
+ )
+ );
+ }
@Test
public void testValues() throws Exception {
String sql = "insert into string_table(name, description) values "
+ "('hello', 'world'), ('james', 'bond')";
PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
- PAssert.that(rows).containsInAnyOrder(MockedBeamSqlTable.of(
- SqlTypeName.VARCHAR, "name",
- SqlTypeName.VARCHAR, "description",
- "hello", "world",
- "james", "bond").getInputRecords());
+ PAssert.that(rows).containsInAnyOrder(
+ TestUtils.RowsBuilder.of(
+ Types.VARCHAR, "name",
+ Types.VARCHAR, "description"
+ ).addRows(
+ "hello", "world",
+ "james", "bond"
+ ).getRows()
+ );
pipeline.run();
}
@@ -63,11 +77,14 @@ public class BeamValuesRelTest {
public void testValues_castInt() throws Exception {
String sql = "insert into int_table (c0, c1) values(cast(1 as int), cast(2 as int))";
PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
- PAssert.that(rows).containsInAnyOrder(MockedBeamSqlTable.of(
- SqlTypeName.INTEGER, "c0",
- SqlTypeName.INTEGER, "c1",
- 1, 2
- ).getInputRecords());
+ PAssert.that(rows).containsInAnyOrder(
+ TestUtils.RowsBuilder.of(
+ Types.INTEGER, "c0",
+ Types.INTEGER, "c1"
+ ).addRows(
+ 1, 2
+ ).getRows()
+ );
pipeline.run();
}
@@ -75,17 +92,14 @@ public class BeamValuesRelTest {
public void testValues_onlySelect() throws Exception {
String sql = "select 1, '1'";
PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
- PAssert.that(rows).containsInAnyOrder(MockedBeamSqlTable.of(
- SqlTypeName.INTEGER, "EXPR$0",
- SqlTypeName.CHAR, "EXPR$1",
- 1, "1"
- ).getInputRecords());
+ PAssert.that(rows).containsInAnyOrder(
+ TestUtils.RowsBuilder.of(
+ Types.INTEGER, "EXPR$0",
+ Types.CHAR, "EXPR$1"
+ ).addRows(
+ 1, "1"
+ ).getRows()
+ );
pipeline.run();
}
-
- @BeforeClass
- public static void prepareClass() {
- sqlEnv.registerTable("string_table", stringTable);
- sqlEnv.registerTable("int_table", intTable);
- }
}
[3/3] beam git commit: [BEAM-2515] BeamSql: refactor the
MockedBeamSqlTable and related tests
Posted by lc...@apache.org.
[BEAM-2515] BeamSql: refactor the MockedBeamSqlTable and related tests
This closes #3478
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ca2bc723
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ca2bc723
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ca2bc723
Branch: refs/heads/DSL_SQL
Commit: ca2bc723dc00f0a5bf3e6157f8cd79ef4297093b
Parents: 7ba77dd bc66698
Author: Luke Cwik <lc...@google.com>
Authored: Wed Jul 5 09:34:17 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Jul 5 09:34:17 2017 -0700
----------------------------------------------------------------------
.../org/apache/beam/dsls/sql/TestUtils.java | 81 +++++--
.../beam/dsls/sql/mock/MockedBoundedTable.java | 126 +++++++++++
.../apache/beam/dsls/sql/mock/MockedTable.java | 42 ++++
.../dsls/sql/mock/MockedUnboundedTable.java | 113 ++++++++++
.../dsls/sql/planner/MockedBeamSqlTable.java | 163 --------------
.../beam/dsls/sql/planner/MockedTable.java | 33 ---
.../dsls/sql/planner/MockedUnboundedTable.java | 120 ----------
.../beam/dsls/sql/rel/BeamIntersectRelTest.java | 78 ++++---
.../rel/BeamJoinRelBoundedVsBoundedTest.java | 141 ++++++------
.../rel/BeamJoinRelUnboundedVsBoundedTest.java | 21 +-
.../BeamJoinRelUnboundedVsUnboundedTest.java | 10 +-
.../beam/dsls/sql/rel/BeamMinusRelTest.java | 78 ++++---
.../sql/rel/BeamSetOperatorRelBaseTest.java | 68 +++---
.../beam/dsls/sql/rel/BeamSortRelTest.java | 222 +++++++++----------
.../beam/dsls/sql/rel/BeamUnionRelTest.java | 47 ++--
.../beam/dsls/sql/rel/BeamValuesRelTest.java | 78 ++++---
16 files changed, 714 insertions(+), 707 deletions(-)
----------------------------------------------------------------------