You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/08/02 05:08:35 UTC
[12/59] beam git commit: move dsls/sql to sdks/java/extensions/sql
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpressionTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpressionTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpressionTest.java
deleted file mode 100644
index 1a734bc..0000000
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpressionTest.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql.interpreter.operator.string;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutorTestBase;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.junit.Test;
-
-/**
- * Test of BeamSqlUpperExpression.
- */
-public class BeamSqlUpperExpressionTest extends BeamSqlFnExecutorTestBase {
-
- @Test public void evaluate() throws Exception {
- List<BeamSqlExpression> operands = new ArrayList<>();
-
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
- assertEquals("HELLO",
- new BeamSqlUpperExpression(operands).evaluate(record).getValue());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedBoundedTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedBoundedTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedBoundedTable.java
deleted file mode 100644
index 6c1dcb2..0000000
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedBoundedTable.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.mock;
-
-import static org.apache.beam.dsls.sql.TestUtils.buildBeamSqlRowType;
-import static org.apache.beam.dsls.sql.TestUtils.buildRows;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import org.apache.beam.dsls.sql.schema.BeamIOType;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-
-/**
- * Mocked table for bounded data sources.
- */
-public class MockedBoundedTable extends MockedTable {
- /** rows written to this table. */
- private static final ConcurrentLinkedQueue<BeamSqlRow> CONTENT = new ConcurrentLinkedQueue<>();
- /** rows flow out from this table. */
- private final List<BeamSqlRow> rows = new ArrayList<>();
-
- public MockedBoundedTable(BeamSqlRowType beamSqlRowType) {
- super(beamSqlRowType);
- }
-
- /**
- * Convenient way to build a mocked bounded table.
- *
- * <p>e.g.
- *
- * <pre>{@code
- * MockedUnboundedTable
- * .of(Types.BIGINT, "order_id",
- * Types.INTEGER, "site_id",
- * Types.DOUBLE, "price",
- * Types.TIMESTAMP, "order_time")
- * }</pre>
- */
- public static MockedBoundedTable of(final Object... args){
- return new MockedBoundedTable(buildBeamSqlRowType(args));
- }
-
- /**
- * Build a mocked bounded table with the specified type.
- */
- public static MockedBoundedTable of(final BeamSqlRowType type) {
- return new MockedBoundedTable(type);
- }
-
-
- /**
- * Add rows to the builder.
- *
- * <p>Sample usage:
- *
- * <pre>{@code
- * addRows(
- * 1, 3, "james", -- first row
- * 2, 5, "bond" -- second row
- * ...
- * )
- * }</pre>
- */
- public MockedBoundedTable addRows(Object... args) {
- List<BeamSqlRow> rows = buildRows(getRowType(), Arrays.asList(args));
- this.rows.addAll(rows);
- return this;
- }
-
- @Override
- public BeamIOType getSourceType() {
- return BeamIOType.BOUNDED;
- }
-
- @Override
- public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) {
- return PBegin.in(pipeline).apply(
- "MockedBoundedTable_Reader_" + COUNTER.incrementAndGet(), Create.of(rows));
- }
-
- @Override public PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter() {
- return new OutputStore();
- }
-
- /**
- * Keep output in {@code CONTENT} for validation.
- *
- */
- public static class OutputStore extends PTransform<PCollection<BeamSqlRow>, PDone> {
-
- @Override
- public PDone expand(PCollection<BeamSqlRow> input) {
- input.apply(ParDo.of(new DoFn<BeamSqlRow, Void>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- CONTENT.add(c.element());
- }
-
- @Teardown
- public void close() {
- CONTENT.clear();
- }
-
- }));
- return PDone.in(input.getPipeline());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedTable.java
deleted file mode 100644
index 858ae88..0000000
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedTable.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql.mock;
-
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.beam.dsls.sql.schema.BaseBeamTable;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-
-/**
- * Base class for mocked table.
- */
-public abstract class MockedTable extends BaseBeamTable {
- public static final AtomicInteger COUNTER = new AtomicInteger();
- public MockedTable(BeamSqlRowType beamSqlRowType) {
- super(beamSqlRowType);
- }
-
- @Override
- public PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter() {
- throw new UnsupportedOperationException("buildIOWriter unsupported!");
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedUnboundedTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedUnboundedTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedUnboundedTable.java
deleted file mode 100644
index ee6eb22..0000000
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedUnboundedTable.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql.mock;
-
-import static org.apache.beam.dsls.sql.TestUtils.buildBeamSqlRowType;
-import static org.apache.beam.dsls.sql.TestUtils.buildRows;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import org.apache.beam.dsls.sql.schema.BeamIOType;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
-import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.testing.TestStream;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.apache.calcite.util.Pair;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-/**
- * A mocked unbounded table.
- */
-public class MockedUnboundedTable extends MockedTable {
- /** rows flow out from this table with the specified watermark instant. */
- private final List<Pair<Duration, List<BeamSqlRow>>> timestampedRows = new ArrayList<>();
- /** specify the index of column in the row which stands for the event time field. */
- private int timestampField;
- private MockedUnboundedTable(BeamSqlRowType beamSqlRowType) {
- super(beamSqlRowType);
- }
-
- /**
- * Convenient way to build a mocked unbounded table.
- *
- * <p>e.g.
- *
- * <pre>{@code
- * MockedUnboundedTable
- * .of(Types.BIGINT, "order_id",
- * Types.INTEGER, "site_id",
- * Types.DOUBLE, "price",
- * Types.TIMESTAMP, "order_time")
- * }</pre>
- */
- public static MockedUnboundedTable of(final Object... args){
- return new MockedUnboundedTable(buildBeamSqlRowType(args));
- }
-
- public MockedUnboundedTable timestampColumnIndex(int idx) {
- this.timestampField = idx;
- return this;
- }
-
- /**
- * Add rows to the builder.
- *
- * <p>Sample usage:
- *
- * <pre>{@code
- * addRows(
- * duration, -- duration which stands for the corresponding watermark instant
- * 1, 3, "james", -- first row
- * 2, 5, "bond" -- second row
- * ...
- * )
- * }</pre>
- */
- public MockedUnboundedTable addRows(Duration duration, Object... args) {
- List<BeamSqlRow> rows = buildRows(getRowType(), Arrays.asList(args));
- // record the watermark + rows
- this.timestampedRows.add(Pair.of(duration, rows));
- return this;
- }
-
- @Override public BeamIOType getSourceType() {
- return BeamIOType.UNBOUNDED;
- }
-
- @Override public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) {
- TestStream.Builder<BeamSqlRow> values = TestStream.create(
- new BeamSqlRowCoder(beamSqlRowType));
-
- for (Pair<Duration, List<BeamSqlRow>> pair : timestampedRows) {
- values = values.advanceWatermarkTo(new Instant(0).plus(pair.getKey()));
- for (int i = 0; i < pair.getValue().size(); i++) {
- values = values.addElements(TimestampedValue.of(pair.getValue().get(i),
- new Instant(pair.getValue().get(i).getDate(timestampField))));
- }
- }
-
- return pipeline.begin().apply(
- "MockedUnboundedTable_" + COUNTER.incrementAndGet(),
- values.advanceWatermarkToInfinity());
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java
deleted file mode 100644
index 3b37143..0000000
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql.rel;
-
-import java.sql.Types;
-import org.apache.beam.dsls.sql.BeamSqlCli;
-import org.apache.beam.dsls.sql.BeamSqlEnv;
-import org.apache.beam.dsls.sql.TestUtils;
-import org.apache.beam.dsls.sql.mock.MockedBoundedTable;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.values.PCollection;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-
-/**
- * Test for {@code BeamIntersectRel}.
- */
-public class BeamIntersectRelTest {
- static BeamSqlEnv sqlEnv = new BeamSqlEnv();
-
- @Rule
- public final TestPipeline pipeline = TestPipeline.create();
-
- @BeforeClass
- public static void prepare() {
- sqlEnv.registerTable("ORDER_DETAILS1",
- MockedBoundedTable.of(
- Types.BIGINT, "order_id",
- Types.INTEGER, "site_id",
- Types.DOUBLE, "price"
- ).addRows(
- 1L, 1, 1.0,
- 1L, 1, 1.0,
- 2L, 2, 2.0,
- 4L, 4, 4.0
- )
- );
-
- sqlEnv.registerTable("ORDER_DETAILS2",
- MockedBoundedTable.of(
- Types.BIGINT, "order_id",
- Types.INTEGER, "site_id",
- Types.DOUBLE, "price"
- ).addRows(
- 1L, 1, 1.0,
- 2L, 2, 2.0,
- 3L, 3, 3.0
- )
- );
- }
-
- @Test
- public void testIntersect() throws Exception {
- String sql = "";
- sql += "SELECT order_id, site_id, price "
- + "FROM ORDER_DETAILS1 "
- + " INTERSECT "
- + "SELECT order_id, site_id, price "
- + "FROM ORDER_DETAILS2 ";
-
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
- PAssert.that(rows).containsInAnyOrder(
- TestUtils.RowsBuilder.of(
- Types.BIGINT, "order_id",
- Types.INTEGER, "site_id",
- Types.DOUBLE, "price"
- ).addRows(
- 1L, 1, 1.0,
- 2L, 2, 2.0
- ).getRows());
-
- pipeline.run().waitUntilFinish();
- }
-
- @Test
- public void testIntersectAll() throws Exception {
- String sql = "";
- sql += "SELECT order_id, site_id, price "
- + "FROM ORDER_DETAILS1 "
- + " INTERSECT ALL "
- + "SELECT order_id, site_id, price "
- + "FROM ORDER_DETAILS2 ";
-
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
- PAssert.that(rows).satisfies(new CheckSize(3));
-
- PAssert.that(rows).containsInAnyOrder(
- TestUtils.RowsBuilder.of(
- Types.BIGINT, "order_id",
- Types.INTEGER, "site_id",
- Types.DOUBLE, "price"
- ).addRows(
- 1L, 1, 1.0,
- 1L, 1, 1.0,
- 2L, 2, 2.0
- ).getRows());
-
- pipeline.run();
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelBoundedVsBoundedTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelBoundedVsBoundedTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelBoundedVsBoundedTest.java
deleted file mode 100644
index 24a3256..0000000
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelBoundedVsBoundedTest.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql.rel;
-
-import java.sql.Types;
-import org.apache.beam.dsls.sql.BeamSqlCli;
-import org.apache.beam.dsls.sql.BeamSqlEnv;
-import org.apache.beam.dsls.sql.TestUtils;
-import org.apache.beam.dsls.sql.mock.MockedBoundedTable;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.values.PCollection;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-
-/**
- * Bounded + Bounded Test for {@code BeamJoinRel}.
- */
-public class BeamJoinRelBoundedVsBoundedTest {
- @Rule
- public final TestPipeline pipeline = TestPipeline.create();
- private static final BeamSqlEnv beamSqlEnv = new BeamSqlEnv();
-
- public static final MockedBoundedTable ORDER_DETAILS1 =
- MockedBoundedTable.of(
- Types.INTEGER, "order_id",
- Types.INTEGER, "site_id",
- Types.INTEGER, "price"
- ).addRows(
- 1, 2, 3,
- 2, 3, 3,
- 3, 4, 5
- );
-
- public static final MockedBoundedTable ORDER_DETAILS2 =
- MockedBoundedTable.of(
- Types.INTEGER, "order_id",
- Types.INTEGER, "site_id",
- Types.INTEGER, "price"
- ).addRows(
- 1, 2, 3,
- 2, 3, 3,
- 3, 4, 5
- );
-
- @BeforeClass
- public static void prepare() {
- beamSqlEnv.registerTable("ORDER_DETAILS1", ORDER_DETAILS1);
- beamSqlEnv.registerTable("ORDER_DETAILS2", ORDER_DETAILS2);
- }
-
- @Test
- public void testInnerJoin() throws Exception {
- String sql =
- "SELECT * "
- + "FROM ORDER_DETAILS1 o1"
- + " JOIN ORDER_DETAILS2 o2"
- + " on "
- + " o1.order_id=o2.site_id AND o2.price=o1.site_id"
- ;
-
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
- PAssert.that(rows).containsInAnyOrder(
- TestUtils.RowsBuilder.of(
- Types.INTEGER, "order_id",
- Types.INTEGER, "site_id",
- Types.INTEGER, "price",
- Types.INTEGER, "order_id0",
- Types.INTEGER, "site_id0",
- Types.INTEGER, "price0"
- ).addRows(
- 2, 3, 3, 1, 2, 3
- ).getRows());
- pipeline.run();
- }
-
- @Test
- public void testLeftOuterJoin() throws Exception {
- String sql =
- "SELECT * "
- + "FROM ORDER_DETAILS1 o1"
- + " LEFT OUTER JOIN ORDER_DETAILS2 o2"
- + " on "
- + " o1.order_id=o2.site_id AND o2.price=o1.site_id"
- ;
-
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
- pipeline.enableAbandonedNodeEnforcement(false);
- PAssert.that(rows).containsInAnyOrder(
- TestUtils.RowsBuilder.of(
- Types.INTEGER, "order_id",
- Types.INTEGER, "site_id",
- Types.INTEGER, "price",
- Types.INTEGER, "order_id0",
- Types.INTEGER, "site_id0",
- Types.INTEGER, "price0"
- ).addRows(
- 1, 2, 3, null, null, null,
- 2, 3, 3, 1, 2, 3,
- 3, 4, 5, null, null, null
- ).getRows());
- pipeline.run();
- }
-
- @Test
- public void testRightOuterJoin() throws Exception {
- String sql =
- "SELECT * "
- + "FROM ORDER_DETAILS1 o1"
- + " RIGHT OUTER JOIN ORDER_DETAILS2 o2"
- + " on "
- + " o1.order_id=o2.site_id AND o2.price=o1.site_id"
- ;
-
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
- PAssert.that(rows).containsInAnyOrder(
- TestUtils.RowsBuilder.of(
- Types.INTEGER, "order_id",
- Types.INTEGER, "site_id",
- Types.INTEGER, "price",
- Types.INTEGER, "order_id0",
- Types.INTEGER, "site_id0",
- Types.INTEGER, "price0"
- ).addRows(
- 2, 3, 3, 1, 2, 3,
- null, null, null, 2, 3, 3,
- null, null, null, 3, 4, 5
- ).getRows());
- pipeline.run();
- }
-
- @Test
- public void testFullOuterJoin() throws Exception {
- String sql =
- "SELECT * "
- + "FROM ORDER_DETAILS1 o1"
- + " FULL OUTER JOIN ORDER_DETAILS2 o2"
- + " on "
- + " o1.order_id=o2.site_id AND o2.price=o1.site_id"
- ;
-
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
- PAssert.that(rows).containsInAnyOrder(
- TestUtils.RowsBuilder.of(
- Types.INTEGER, "order_id",
- Types.INTEGER, "site_id",
- Types.INTEGER, "price",
- Types.INTEGER, "order_id0",
- Types.INTEGER, "site_id0",
- Types.INTEGER, "price0"
- ).addRows(
- 2, 3, 3, 1, 2, 3,
- 1, 2, 3, null, null, null,
- 3, 4, 5, null, null, null,
- null, null, null, 2, 3, 3,
- null, null, null, 3, 4, 5
- ).getRows());
- pipeline.run();
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testException_nonEqualJoin() throws Exception {
- String sql =
- "SELECT * "
- + "FROM ORDER_DETAILS1 o1"
- + " JOIN ORDER_DETAILS2 o2"
- + " on "
- + " o1.order_id>o2.site_id"
- ;
-
- pipeline.enableAbandonedNodeEnforcement(false);
- BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
- pipeline.run();
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testException_crossJoin() throws Exception {
- String sql =
- "SELECT * "
- + "FROM ORDER_DETAILS1 o1, ORDER_DETAILS2 o2";
-
- pipeline.enableAbandonedNodeEnforcement(false);
- BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
- pipeline.run();
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java
deleted file mode 100644
index 3f0c98e..0000000
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql.rel;
-
-import java.sql.Types;
-import java.util.Date;
-import org.apache.beam.dsls.sql.BeamSqlCli;
-import org.apache.beam.dsls.sql.BeamSqlEnv;
-import org.apache.beam.dsls.sql.TestUtils;
-import org.apache.beam.dsls.sql.mock.MockedBoundedTable;
-import org.apache.beam.dsls.sql.mock.MockedUnboundedTable;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.dsls.sql.transform.BeamSqlOutputToConsoleFn;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PCollection;
-import org.joda.time.Duration;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-
-/**
- * Unbounded + Unbounded Test for {@code BeamJoinRel}.
- */
-public class BeamJoinRelUnboundedVsBoundedTest {
- @Rule
- public final TestPipeline pipeline = TestPipeline.create();
- private static final BeamSqlEnv beamSqlEnv = new BeamSqlEnv();
- public static final Date FIRST_DATE = new Date(1);
- public static final Date SECOND_DATE = new Date(1 + 3600 * 1000);
- public static final Date THIRD_DATE = new Date(1 + 3600 * 1000 + 3600 * 1000 + 1);
- private static final Duration WINDOW_SIZE = Duration.standardHours(1);
-
- @BeforeClass
- public static void prepare() {
- beamSqlEnv.registerTable("ORDER_DETAILS", MockedUnboundedTable
- .of(
- Types.INTEGER, "order_id",
- Types.INTEGER, "site_id",
- Types.INTEGER, "price",
- Types.TIMESTAMP, "order_time"
- )
- .timestampColumnIndex(3)
- .addRows(
- Duration.ZERO,
- 1, 1, 1, FIRST_DATE,
- 1, 2, 2, FIRST_DATE
- )
- .addRows(
- WINDOW_SIZE.plus(Duration.standardSeconds(1)),
- 2, 2, 3, SECOND_DATE,
- 2, 3, 3, SECOND_DATE,
- // this late data is omitted
- 1, 2, 3, FIRST_DATE
- )
- .addRows(
- WINDOW_SIZE.plus(WINDOW_SIZE).plus(Duration.standardSeconds(1)),
- 3, 3, 3, THIRD_DATE,
- // this late data is omitted
- 2, 2, 3, SECOND_DATE
- )
- );
-
- beamSqlEnv.registerTable("ORDER_DETAILS1", MockedBoundedTable
- .of(Types.INTEGER, "order_id",
- Types.VARCHAR, "buyer"
- ).addRows(
- 1, "james",
- 2, "bond"
- ));
- }
-
- @Test
- public void testInnerJoin_unboundedTableOnTheLeftSide() throws Exception {
- String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM "
- + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
- + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
- + " JOIN "
- + " ORDER_DETAILS1 o2 "
- + " on "
- + " o1.order_id=o2.order_id"
- ;
-
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
- PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
- .containsInAnyOrder(
- TestUtils.RowsBuilder.of(
- Types.INTEGER, "order_id",
- Types.INTEGER, "sum_site_id",
- Types.VARCHAR, "buyer"
- ).addRows(
- 1, 3, "james",
- 2, 5, "bond"
- ).getStringRows()
- );
- pipeline.run();
- }
-
- @Test
- public void testInnerJoin_boundedTableOnTheLeftSide() throws Exception {
- String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM "
- + " ORDER_DETAILS1 o2 "
- + " JOIN "
- + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
- + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
- + " on "
- + " o1.order_id=o2.order_id"
- ;
-
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
- PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
- .containsInAnyOrder(
- TestUtils.RowsBuilder.of(
- Types.INTEGER, "order_id",
- Types.INTEGER, "sum_site_id",
- Types.VARCHAR, "buyer"
- ).addRows(
- 1, 3, "james",
- 2, 5, "bond"
- ).getStringRows()
- );
- pipeline.run();
- }
-
- @Test
- public void testLeftOuterJoin() throws Exception {
- String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM "
- + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
- + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
- + " LEFT OUTER JOIN "
- + " ORDER_DETAILS1 o2 "
- + " on "
- + " o1.order_id=o2.order_id"
- ;
-
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
- rows.apply(ParDo.of(new BeamSqlOutputToConsoleFn("helloworld")));
- PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
- .containsInAnyOrder(
- TestUtils.RowsBuilder.of(
- Types.INTEGER, "order_id",
- Types.INTEGER, "sum_site_id",
- Types.VARCHAR, "buyer"
- ).addRows(
- 1, 3, "james",
- 2, 5, "bond",
- 3, 3, null
- ).getStringRows()
- );
- pipeline.run();
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testLeftOuterJoinError() throws Exception {
- String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM "
- + " ORDER_DETAILS1 o2 "
- + " LEFT OUTER JOIN "
- + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
- + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
- + " on "
- + " o1.order_id=o2.order_id"
- ;
- pipeline.enableAbandonedNodeEnforcement(false);
- BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
- pipeline.run();
- }
-
- @Test
- public void testRightOuterJoin() throws Exception {
- String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM "
- + " ORDER_DETAILS1 o2 "
- + " RIGHT OUTER JOIN "
- + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
- + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
- + " on "
- + " o1.order_id=o2.order_id"
- ;
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
- PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
- .containsInAnyOrder(
- TestUtils.RowsBuilder.of(
- Types.INTEGER, "order_id",
- Types.INTEGER, "sum_site_id",
- Types.VARCHAR, "buyer"
- ).addRows(
- 1, 3, "james",
- 2, 5, "bond",
- 3, 3, null
- ).getStringRows()
- );
- pipeline.run();
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testRightOuterJoinError() throws Exception {
- String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM "
- + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
- + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
- + " RIGHT OUTER JOIN "
- + " ORDER_DETAILS1 o2 "
- + " on "
- + " o1.order_id=o2.order_id"
- ;
-
- pipeline.enableAbandonedNodeEnforcement(false);
- BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
- pipeline.run();
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testFullOuterJoinError() throws Exception {
- String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM "
- + " ORDER_DETAILS1 o2 "
- + " FULL OUTER JOIN "
- + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
- + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
- + " on "
- + " o1.order_id=o2.order_id"
- ;
- pipeline.enableAbandonedNodeEnforcement(false);
- BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
- pipeline.run();
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java
deleted file mode 100644
index d76e875..0000000
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql.rel;
-
-import java.sql.Types;
-import java.util.Date;
-import org.apache.beam.dsls.sql.BeamSqlCli;
-import org.apache.beam.dsls.sql.BeamSqlEnv;
-import org.apache.beam.dsls.sql.TestUtils;
-import org.apache.beam.dsls.sql.mock.MockedUnboundedTable;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.dsls.sql.transform.BeamSqlOutputToConsoleFn;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PCollection;
-import org.joda.time.Duration;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-
-/**
- * Unbounded + Unbounded Test for {@code BeamJoinRel}.
- */
-public class BeamJoinRelUnboundedVsUnboundedTest {
- @Rule
- public final TestPipeline pipeline = TestPipeline.create();
- private static final BeamSqlEnv beamSqlEnv = new BeamSqlEnv();
- public static final Date FIRST_DATE = new Date(1);
- public static final Date SECOND_DATE = new Date(1 + 3600 * 1000);
-
- private static final Duration WINDOW_SIZE = Duration.standardHours(1);
-
- @BeforeClass
- public static void prepare() {
- beamSqlEnv.registerTable("ORDER_DETAILS", MockedUnboundedTable
- .of(Types.INTEGER, "order_id",
- Types.INTEGER, "site_id",
- Types.INTEGER, "price",
- Types.TIMESTAMP, "order_time"
- )
- .timestampColumnIndex(3)
- .addRows(
- Duration.ZERO,
- 1, 1, 1, FIRST_DATE,
- 1, 2, 6, FIRST_DATE
- )
- .addRows(
- WINDOW_SIZE.plus(Duration.standardMinutes(1)),
- 2, 2, 7, SECOND_DATE,
- 2, 3, 8, SECOND_DATE,
- // this late record is omitted(First window)
- 1, 3, 3, FIRST_DATE
- )
- .addRows(
- // this late record is omitted(Second window)
- WINDOW_SIZE.plus(WINDOW_SIZE).plus(Duration.standardMinutes(1)),
- 2, 3, 3, SECOND_DATE
- )
- );
- }
-
- @Test
- public void testInnerJoin() throws Exception {
- String sql = "SELECT * FROM "
- + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
- + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
- + " JOIN "
- + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
- + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o2 "
- + " on "
- + " o1.order_id=o2.order_id"
- ;
-
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
- PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
- .containsInAnyOrder(
- TestUtils.RowsBuilder.of(
- Types.INTEGER, "order_id",
- Types.INTEGER, "sum_site_id",
- Types.INTEGER, "order_id0",
- Types.INTEGER, "sum_site_id0").addRows(
- 1, 3, 1, 3,
- 2, 5, 2, 5
- ).getStringRows()
- );
- pipeline.run();
- }
-
- @Test
- public void testLeftOuterJoin() throws Exception {
- String sql = "SELECT * FROM "
- + "(select site_id as order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
- + " GROUP BY site_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
- + " LEFT OUTER JOIN "
- + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
- + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o2 "
- + " on "
- + " o1.order_id=o2.order_id"
- ;
-
- // 1, 1 | 1, 3
- // 2, 2 | NULL, NULL
- // ---- | -----
- // 2, 2 | 2, 5
- // 3, 3 | NULL, NULL
-
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
- PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
- .containsInAnyOrder(
- TestUtils.RowsBuilder.of(
- Types.INTEGER, "order_id",
- Types.INTEGER, "sum_site_id",
- Types.INTEGER, "order_id0",
- Types.INTEGER, "sum_site_id0"
- ).addRows(
- 1, 1, 1, 3,
- 2, 2, null, null,
- 2, 2, 2, 5,
- 3, 3, null, null
- ).getStringRows()
- );
- pipeline.run();
- }
-
- @Test
- public void testRightOuterJoin() throws Exception {
- String sql = "SELECT * FROM "
- + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
- + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
- + " RIGHT OUTER JOIN "
- + "(select site_id as order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
- + " GROUP BY site_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o2 "
- + " on "
- + " o1.order_id=o2.order_id"
- ;
-
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
- PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
- .containsInAnyOrder(
- TestUtils.RowsBuilder.of(
- Types.INTEGER, "order_id",
- Types.INTEGER, "sum_site_id",
- Types.INTEGER, "order_id0",
- Types.INTEGER, "sum_site_id0"
- ).addRows(
- 1, 3, 1, 1,
- null, null, 2, 2,
- 2, 5, 2, 2,
- null, null, 3, 3
- ).getStringRows()
- );
- pipeline.run();
- }
-
- @Test
- public void testFullOuterJoin() throws Exception {
- String sql = "SELECT * FROM "
- + "(select price as order_id1, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
- + " GROUP BY price, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
- + " FULL OUTER JOIN "
- + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
- + " GROUP BY order_id , TUMBLE(order_time, INTERVAL '1' HOUR)) o2 "
- + " on "
- + " o1.order_id1=o2.order_id"
- ;
-
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
- rows.apply(ParDo.of(new BeamSqlOutputToConsoleFn("hello")));
- PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
- .containsInAnyOrder(
- TestUtils.RowsBuilder.of(
- Types.INTEGER, "order_id1",
- Types.INTEGER, "sum_site_id",
- Types.INTEGER, "order_id",
- Types.INTEGER, "sum_site_id0"
- ).addRows(
- 1, 1, 1, 3,
- 6, 2, null, null,
- 7, 2, null, null,
- 8, 3, null, null,
- null, null, 2, 5
- ).getStringRows()
- );
- pipeline.run();
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testWindowsMismatch() throws Exception {
- String sql = "SELECT * FROM "
- + "(select site_id as order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
- + " GROUP BY site_id, TUMBLE(order_time, INTERVAL '2' HOUR)) o1 "
- + " LEFT OUTER JOIN "
- + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
- + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o2 "
- + " on "
- + " o1.order_id=o2.order_id"
- ;
- pipeline.enableAbandonedNodeEnforcement(false);
- BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
- pipeline.run();
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java
deleted file mode 100644
index 80da8fb..0000000
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql.rel;
-
-import java.sql.Types;
-import org.apache.beam.dsls.sql.BeamSqlCli;
-import org.apache.beam.dsls.sql.BeamSqlEnv;
-import org.apache.beam.dsls.sql.TestUtils;
-import org.apache.beam.dsls.sql.mock.MockedBoundedTable;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.values.PCollection;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-
-/**
- * Test for {@code BeamMinusRel}.
- */
-public class BeamMinusRelTest {
- static BeamSqlEnv sqlEnv = new BeamSqlEnv();
-
- @Rule
- public final TestPipeline pipeline = TestPipeline.create();
-
- @BeforeClass
- public static void prepare() {
- sqlEnv.registerTable("ORDER_DETAILS1",
- MockedBoundedTable.of(
- Types.BIGINT, "order_id",
- Types.INTEGER, "site_id",
- Types.DOUBLE, "price"
- ).addRows(
- 1L, 1, 1.0,
- 1L, 1, 1.0,
- 2L, 2, 2.0,
- 4L, 4, 4.0,
- 4L, 4, 4.0
- )
- );
-
- sqlEnv.registerTable("ORDER_DETAILS2",
- MockedBoundedTable.of(
- Types.BIGINT, "order_id",
- Types.INTEGER, "site_id",
- Types.DOUBLE, "price"
- ).addRows(
- 1L, 1, 1.0,
- 2L, 2, 2.0,
- 3L, 3, 3.0
- )
- );
- }
-
- @Test
- public void testExcept() throws Exception {
- String sql = "";
- sql += "SELECT order_id, site_id, price "
- + "FROM ORDER_DETAILS1 "
- + " EXCEPT "
- + "SELECT order_id, site_id, price "
- + "FROM ORDER_DETAILS2 ";
-
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
- PAssert.that(rows).containsInAnyOrder(
- TestUtils.RowsBuilder.of(
- Types.BIGINT, "order_id",
- Types.INTEGER, "site_id",
- Types.DOUBLE, "price"
- ).addRows(
- 4L, 4, 4.0
- ).getRows());
-
- pipeline.run();
- }
-
- @Test
- public void testExceptAll() throws Exception {
- String sql = "";
- sql += "SELECT order_id, site_id, price "
- + "FROM ORDER_DETAILS1 "
- + " EXCEPT ALL "
- + "SELECT order_id, site_id, price "
- + "FROM ORDER_DETAILS2 ";
-
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
- PAssert.that(rows).satisfies(new CheckSize(2));
-
- PAssert.that(rows).containsInAnyOrder(
- TestUtils.RowsBuilder.of(
- Types.BIGINT, "order_id",
- Types.INTEGER, "site_id",
- Types.DOUBLE, "price"
- ).addRows(
- 4L, 4, 4.0,
- 4L, 4, 4.0
- ).getRows());
-
- pipeline.run();
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java
deleted file mode 100644
index d0b01df..0000000
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql.rel;
-
-import java.sql.Types;
-import java.util.Date;
-import org.apache.beam.dsls.sql.BeamSqlCli;
-import org.apache.beam.dsls.sql.BeamSqlEnv;
-import org.apache.beam.dsls.sql.TestUtils;
-import org.apache.beam.dsls.sql.mock.MockedBoundedTable;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PCollection;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-
-/**
- * Test for {@code BeamSetOperatorRelBase}.
- */
-public class BeamSetOperatorRelBaseTest {
- static BeamSqlEnv sqlEnv = new BeamSqlEnv();
-
- @Rule
- public final TestPipeline pipeline = TestPipeline.create();
- public static final Date THE_DATE = new Date(100000);
-
- @BeforeClass
- public static void prepare() {
- sqlEnv.registerTable("ORDER_DETAILS",
- MockedBoundedTable.of(
- Types.BIGINT, "order_id",
- Types.INTEGER, "site_id",
- Types.DOUBLE, "price",
- Types.TIMESTAMP, "order_time"
- ).addRows(
- 1L, 1, 1.0, THE_DATE,
- 2L, 2, 2.0, THE_DATE
- )
- );
- }
-
- @Test
- public void testSameWindow() throws Exception {
- String sql = "SELECT "
- + " order_id, site_id, count(*) as cnt "
- + "FROM ORDER_DETAILS GROUP BY order_id, site_id"
- + ", TUMBLE(order_time, INTERVAL '1' HOUR) "
- + " UNION SELECT "
- + " order_id, site_id, count(*) as cnt "
- + "FROM ORDER_DETAILS GROUP BY order_id, site_id"
- + ", TUMBLE(order_time, INTERVAL '1' HOUR) ";
-
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
- // compare valueInString to ignore the windowStart & windowEnd
- PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
- .containsInAnyOrder(
- TestUtils.RowsBuilder.of(
- Types.BIGINT, "order_id",
- Types.INTEGER, "site_id",
- Types.BIGINT, "cnt"
- ).addRows(
- 1L, 1, 1L,
- 2L, 2, 1L
- ).getStringRows());
- pipeline.run();
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testDifferentWindows() throws Exception {
- String sql = "SELECT "
- + " order_id, site_id, count(*) as cnt "
- + "FROM ORDER_DETAILS GROUP BY order_id, site_id"
- + ", TUMBLE(order_time, INTERVAL '1' HOUR) "
- + " UNION SELECT "
- + " order_id, site_id, count(*) as cnt "
- + "FROM ORDER_DETAILS GROUP BY order_id, site_id"
- + ", TUMBLE(order_time, INTERVAL '2' HOUR) ";
-
- // use a real pipeline rather than the TestPipeline because we are
- // testing exceptions, the pipeline will not actually run.
- Pipeline pipeline1 = Pipeline.create(PipelineOptionsFactory.create());
- BeamSqlCli.compilePipeline(sql, pipeline1, sqlEnv);
- pipeline.run();
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java
deleted file mode 100644
index 1067926..0000000
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java
+++ /dev/null
@@ -1,237 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql.rel;
-
-import java.sql.Types;
-import java.util.Date;
-import org.apache.beam.dsls.sql.BeamSqlCli;
-import org.apache.beam.dsls.sql.BeamSqlEnv;
-import org.apache.beam.dsls.sql.TestUtils;
-import org.apache.beam.dsls.sql.mock.MockedBoundedTable;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.values.PCollection;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-
-/**
- * Test for {@code BeamSortRel}.
- */
-public class BeamSortRelTest {
- static BeamSqlEnv sqlEnv = new BeamSqlEnv();
-
- @Rule
- public final TestPipeline pipeline = TestPipeline.create();
-
- @Before
- public void prepare() {
- sqlEnv.registerTable("ORDER_DETAILS",
- MockedBoundedTable.of(
- Types.BIGINT, "order_id",
- Types.INTEGER, "site_id",
- Types.DOUBLE, "price",
- Types.TIMESTAMP, "order_time"
- ).addRows(
- 1L, 2, 1.0, new Date(),
- 1L, 1, 2.0, new Date(),
- 2L, 4, 3.0, new Date(),
- 2L, 1, 4.0, new Date(),
- 5L, 5, 5.0, new Date(),
- 6L, 6, 6.0, new Date(),
- 7L, 7, 7.0, new Date(),
- 8L, 8888, 8.0, new Date(),
- 8L, 999, 9.0, new Date(),
- 10L, 100, 10.0, new Date()
- )
- );
- sqlEnv.registerTable("SUB_ORDER_RAM",
- MockedBoundedTable.of(
- Types.BIGINT, "order_id",
- Types.INTEGER, "site_id",
- Types.DOUBLE, "price"
- )
- );
- }
-
- @Test
- public void testOrderBy_basic() throws Exception {
- String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT "
- + " order_id, site_id, price "
- + "FROM ORDER_DETAILS "
- + "ORDER BY order_id asc, site_id desc limit 4";
-
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
- PAssert.that(rows).containsInAnyOrder(TestUtils.RowsBuilder.of(
- Types.BIGINT, "order_id",
- Types.INTEGER, "site_id",
- Types.DOUBLE, "price"
- ).addRows(
- 1L, 2, 1.0,
- 1L, 1, 2.0,
- 2L, 4, 3.0,
- 2L, 1, 4.0
- ).getRows());
- pipeline.run().waitUntilFinish();
- }
-
- @Test
- public void testOrderBy_nullsFirst() throws Exception {
- sqlEnv.registerTable("ORDER_DETAILS",
- MockedBoundedTable.of(
- Types.BIGINT, "order_id",
- Types.INTEGER, "site_id",
- Types.DOUBLE, "price"
- ).addRows(
- 1L, 2, 1.0,
- 1L, null, 2.0,
- 2L, 1, 3.0,
- 2L, null, 4.0,
- 5L, 5, 5.0
- )
- );
- sqlEnv.registerTable("SUB_ORDER_RAM", MockedBoundedTable
- .of(Types.BIGINT, "order_id",
- Types.INTEGER, "site_id",
- Types.DOUBLE, "price"));
-
- String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT "
- + " order_id, site_id, price "
- + "FROM ORDER_DETAILS "
- + "ORDER BY order_id asc, site_id desc NULLS FIRST limit 4";
-
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
- PAssert.that(rows).containsInAnyOrder(
- TestUtils.RowsBuilder.of(
- Types.BIGINT, "order_id",
- Types.INTEGER, "site_id",
- Types.DOUBLE, "price"
- ).addRows(
- 1L, null, 2.0,
- 1L, 2, 1.0,
- 2L, null, 4.0,
- 2L, 1, 3.0
- ).getRows()
- );
- pipeline.run().waitUntilFinish();
- }
-
- @Test
- public void testOrderBy_nullsLast() throws Exception {
- sqlEnv.registerTable("ORDER_DETAILS", MockedBoundedTable
- .of(Types.BIGINT, "order_id",
- Types.INTEGER, "site_id",
- Types.DOUBLE, "price"
- ).addRows(
- 1L, 2, 1.0,
- 1L, null, 2.0,
- 2L, 1, 3.0,
- 2L, null, 4.0,
- 5L, 5, 5.0));
- sqlEnv.registerTable("SUB_ORDER_RAM", MockedBoundedTable
- .of(Types.BIGINT, "order_id",
- Types.INTEGER, "site_id",
- Types.DOUBLE, "price"));
-
- String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT "
- + " order_id, site_id, price "
- + "FROM ORDER_DETAILS "
- + "ORDER BY order_id asc, site_id desc NULLS LAST limit 4";
-
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
- PAssert.that(rows).containsInAnyOrder(
- TestUtils.RowsBuilder.of(
- Types.BIGINT, "order_id",
- Types.INTEGER, "site_id",
- Types.DOUBLE, "price"
- ).addRows(
- 1L, 2, 1.0,
- 1L, null, 2.0,
- 2L, 1, 3.0,
- 2L, null, 4.0
- ).getRows()
- );
- pipeline.run().waitUntilFinish();
- }
-
- @Test
- public void testOrderBy_with_offset() throws Exception {
- String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT "
- + " order_id, site_id, price "
- + "FROM ORDER_DETAILS "
- + "ORDER BY order_id asc, site_id desc limit 4 offset 4";
-
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
- PAssert.that(rows).containsInAnyOrder(
- TestUtils.RowsBuilder.of(
- Types.BIGINT, "order_id",
- Types.INTEGER, "site_id",
- Types.DOUBLE, "price"
- ).addRows(
- 5L, 5, 5.0,
- 6L, 6, 6.0,
- 7L, 7, 7.0,
- 8L, 8888, 8.0
- ).getRows()
- );
- pipeline.run().waitUntilFinish();
- }
-
- @Test
- public void testOrderBy_bigFetch() throws Exception {
- String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT "
- + " order_id, site_id, price "
- + "FROM ORDER_DETAILS "
- + "ORDER BY order_id asc, site_id desc limit 11";
-
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
- PAssert.that(rows).containsInAnyOrder(
- TestUtils.RowsBuilder.of(
- Types.BIGINT, "order_id",
- Types.INTEGER, "site_id",
- Types.DOUBLE, "price"
- ).addRows(
- 1L, 2, 1.0,
- 1L, 1, 2.0,
- 2L, 4, 3.0,
- 2L, 1, 4.0,
- 5L, 5, 5.0,
- 6L, 6, 6.0,
- 7L, 7, 7.0,
- 8L, 8888, 8.0,
- 8L, 999, 9.0,
- 10L, 100, 10.0
- ).getRows()
- );
- pipeline.run().waitUntilFinish();
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testOrderBy_exception() throws Exception {
- String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id) SELECT "
- + " order_id, COUNT(*) "
- + "FROM ORDER_DETAILS "
- + "GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)"
- + "ORDER BY order_id asc limit 11";
-
- TestPipeline pipeline = TestPipeline.create();
- BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java
deleted file mode 100644
index cad3290..0000000
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql.rel;
-
-import java.sql.Types;
-import org.apache.beam.dsls.sql.BeamSqlCli;
-import org.apache.beam.dsls.sql.BeamSqlEnv;
-import org.apache.beam.dsls.sql.TestUtils;
-import org.apache.beam.dsls.sql.mock.MockedBoundedTable;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.values.PCollection;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-
-/**
- * Test for {@code BeamUnionRel}.
- */
-public class BeamUnionRelTest {
- static BeamSqlEnv sqlEnv = new BeamSqlEnv();
-
- @Rule
- public final TestPipeline pipeline = TestPipeline.create();
-
- @BeforeClass
- public static void prepare() {
- sqlEnv.registerTable("ORDER_DETAILS",
- MockedBoundedTable.of(
- Types.BIGINT, "order_id",
- Types.INTEGER, "site_id",
- Types.DOUBLE, "price"
- ).addRows(
- 1L, 1, 1.0,
- 2L, 2, 2.0
- )
- );
- }
-
- @Test
- public void testUnion() throws Exception {
- String sql = "SELECT "
- + " order_id, site_id, price "
- + "FROM ORDER_DETAILS "
- + " UNION SELECT "
- + " order_id, site_id, price "
- + "FROM ORDER_DETAILS ";
-
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
- PAssert.that(rows).containsInAnyOrder(
- TestUtils.RowsBuilder.of(
- Types.BIGINT, "order_id",
- Types.INTEGER, "site_id",
- Types.DOUBLE, "price"
- ).addRows(
- 1L, 1, 1.0,
- 2L, 2, 2.0
- ).getRows()
- );
- pipeline.run();
- }
-
- @Test
- public void testUnionAll() throws Exception {
- String sql = "SELECT "
- + " order_id, site_id, price "
- + "FROM ORDER_DETAILS"
- + " UNION ALL "
- + " SELECT order_id, site_id, price "
- + "FROM ORDER_DETAILS";
-
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
- PAssert.that(rows).containsInAnyOrder(
- TestUtils.RowsBuilder.of(
- Types.BIGINT, "order_id",
- Types.INTEGER, "site_id",
- Types.DOUBLE, "price"
- ).addRows(
- 1L, 1, 1.0,
- 1L, 1, 1.0,
- 2L, 2, 2.0,
- 2L, 2, 2.0
- ).getRows()
- );
- pipeline.run();
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java
deleted file mode 100644
index 9d13f9b..0000000
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql.rel;
-
-import java.sql.Types;
-import org.apache.beam.dsls.sql.BeamSqlCli;
-import org.apache.beam.dsls.sql.BeamSqlEnv;
-import org.apache.beam.dsls.sql.TestUtils;
-import org.apache.beam.dsls.sql.mock.MockedBoundedTable;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.values.PCollection;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-
-/**
- * Test for {@code BeamValuesRel}.
- */
-public class BeamValuesRelTest {
- static BeamSqlEnv sqlEnv = new BeamSqlEnv();
-
- @Rule
- public final TestPipeline pipeline = TestPipeline.create();
-
- @BeforeClass
- public static void prepare() {
- sqlEnv.registerTable("string_table",
- MockedBoundedTable.of(
- Types.VARCHAR, "name",
- Types.VARCHAR, "description"
- )
- );
- sqlEnv.registerTable("int_table",
- MockedBoundedTable.of(
- Types.INTEGER, "c0",
- Types.INTEGER, "c1"
- )
- );
- }
-
- @Test
- public void testValues() throws Exception {
- String sql = "insert into string_table(name, description) values "
- + "('hello', 'world'), ('james', 'bond')";
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
- PAssert.that(rows).containsInAnyOrder(
- TestUtils.RowsBuilder.of(
- Types.VARCHAR, "name",
- Types.VARCHAR, "description"
- ).addRows(
- "hello", "world",
- "james", "bond"
- ).getRows()
- );
- pipeline.run();
- }
-
- @Test
- public void testValues_castInt() throws Exception {
- String sql = "insert into int_table (c0, c1) values(cast(1 as int), cast(2 as int))";
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
- PAssert.that(rows).containsInAnyOrder(
- TestUtils.RowsBuilder.of(
- Types.INTEGER, "c0",
- Types.INTEGER, "c1"
- ).addRows(
- 1, 2
- ).getRows()
- );
- pipeline.run();
- }
-
- @Test
- public void testValues_onlySelect() throws Exception {
- String sql = "select 1, '1'";
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
- PAssert.that(rows).containsInAnyOrder(
- TestUtils.RowsBuilder.of(
- Types.INTEGER, "EXPR$0",
- Types.CHAR, "EXPR$1"
- ).addRows(
- 1, "1"
- ).getRows()
- );
- pipeline.run();
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/CheckSize.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/CheckSize.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/CheckSize.java
deleted file mode 100644
index ce532df..0000000
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/CheckSize.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql.rel;
-
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.junit.Assert;
-
-/**
- * Utility class to check size of BeamSQLRow iterable.
- */
-public class CheckSize implements SerializableFunction<Iterable<BeamSqlRow>, Void> {
- private int size;
- public CheckSize(int size) {
- this.size = size;
- }
- @Override public Void apply(Iterable<BeamSqlRow> input) {
- int count = 0;
- for (BeamSqlRow row : input) {
- count++;
- }
- Assert.assertEquals(size, count);
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java
deleted file mode 100644
index e41e341..0000000
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql.schema;
-
-import java.math.BigDecimal;
-import java.util.Date;
-import java.util.GregorianCalendar;
-
-import org.apache.beam.dsls.sql.utils.CalciteUtils;
-import org.apache.beam.sdk.testing.CoderProperties;
-import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rel.type.RelDataTypeSystem;
-import org.apache.calcite.rel.type.RelProtoDataType;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.junit.Test;
-
-/**
- * Tests for BeamSqlRowCoder.
- */
-public class BeamSqlRowCoderTest {
-
- @Test
- public void encodeAndDecode() throws Exception {
- final RelProtoDataType protoRowType = new RelProtoDataType() {
- @Override
- public RelDataType apply(RelDataTypeFactory a0) {
- return a0.builder()
- .add("col_tinyint", SqlTypeName.TINYINT)
- .add("col_smallint", SqlTypeName.SMALLINT)
- .add("col_integer", SqlTypeName.INTEGER)
- .add("col_bigint", SqlTypeName.BIGINT)
- .add("col_float", SqlTypeName.FLOAT)
- .add("col_double", SqlTypeName.DOUBLE)
- .add("col_decimal", SqlTypeName.DECIMAL)
- .add("col_string_varchar", SqlTypeName.VARCHAR)
- .add("col_time", SqlTypeName.TIME)
- .add("col_timestamp", SqlTypeName.TIMESTAMP)
- .add("col_boolean", SqlTypeName.BOOLEAN)
- .build();
- }
- };
-
- BeamSqlRowType beamSQLRowType = CalciteUtils.toBeamRowType(
- protoRowType.apply(new JavaTypeFactoryImpl(
- RelDataTypeSystem.DEFAULT)));
- BeamSqlRow row = new BeamSqlRow(beamSQLRowType);
- row.addField("col_tinyint", Byte.valueOf("1"));
- row.addField("col_smallint", Short.valueOf("1"));
- row.addField("col_integer", 1);
- row.addField("col_bigint", 1L);
- row.addField("col_float", 1.1F);
- row.addField("col_double", 1.1);
- row.addField("col_decimal", BigDecimal.ZERO);
- row.addField("col_string_varchar", "hello");
- GregorianCalendar calendar = new GregorianCalendar();
- calendar.setTime(new Date());
- row.addField("col_time", calendar);
- row.addField("col_timestamp", new Date());
- row.addField("col_boolean", true);
-
-
- BeamSqlRowCoder coder = new BeamSqlRowCoder(beamSQLRowType);
- CoderProperties.coderDecodeEncodeEqual(coder, row);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTableTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTableTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTableTest.java
deleted file mode 100644
index 01cd960..0000000
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTableTest.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql.schema.kafka;
-
-import java.io.Serializable;
-import org.apache.beam.dsls.sql.planner.BeamQueryPlanner;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
-import org.apache.beam.dsls.sql.utils.CalciteUtils;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rel.type.RelProtoDataType;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.commons.csv.CSVFormat;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-
-/**
- * Test for BeamKafkaCSVTable.
- */
-public class BeamKafkaCSVTableTest {
- @Rule
- public TestPipeline pipeline = TestPipeline.create();
- public static BeamSqlRow row1 = new BeamSqlRow(genRowType());
- public static BeamSqlRow row2 = new BeamSqlRow(genRowType());
-
- @BeforeClass
- public static void setUp() {
- row1.addField(0, 1L);
- row1.addField(1, 1);
- row1.addField(2, 1.0);
-
- row2.addField(0, 2L);
- row2.addField(1, 2);
- row2.addField(2, 2.0);
- }
-
- @Test public void testCsvRecorderDecoder() throws Exception {
- PCollection<BeamSqlRow> result = pipeline
- .apply(
- Create.of("1,\"1\",1.0", "2,2,2.0")
- )
- .apply(ParDo.of(new String2KvBytes()))
- .apply(
- new BeamKafkaCSVTable.CsvRecorderDecoder(genRowType(), CSVFormat.DEFAULT)
- );
-
- PAssert.that(result).containsInAnyOrder(row1, row2);
-
- pipeline.run();
- }
-
- @Test public void testCsvRecorderEncoder() throws Exception {
- PCollection<BeamSqlRow> result = pipeline
- .apply(
- Create.of(row1, row2)
- )
- .apply(
- new BeamKafkaCSVTable.CsvRecorderEncoder(genRowType(), CSVFormat.DEFAULT)
- ).apply(
- new BeamKafkaCSVTable.CsvRecorderDecoder(genRowType(), CSVFormat.DEFAULT)
- );
-
- PAssert.that(result).containsInAnyOrder(row1, row2);
-
- pipeline.run();
- }
-
- private static BeamSqlRowType genRowType() {
- return CalciteUtils.toBeamRowType(new RelProtoDataType() {
-
- @Override public RelDataType apply(RelDataTypeFactory a0) {
- return a0.builder().add("order_id", SqlTypeName.BIGINT)
- .add("site_id", SqlTypeName.INTEGER)
- .add("price", SqlTypeName.DOUBLE).build();
- }
- }.apply(BeamQueryPlanner.TYPE_FACTORY));
- }
-
- private static class String2KvBytes extends DoFn<String, KV<byte[], byte[]>>
- implements Serializable {
- @ProcessElement
- public void processElement(ProcessContext ctx) {
- ctx.output(KV.of(new byte[] {}, ctx.element().getBytes()));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java
deleted file mode 100644
index b6e11e5..0000000
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql.schema.text;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.PrintStream;
-import java.nio.file.FileVisitResult;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.SimpleFileVisitor;
-import java.nio.file.attribute.BasicFileAttributes;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import org.apache.beam.dsls.sql.planner.BeamQueryPlanner;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
-import org.apache.beam.dsls.sql.utils.CalciteUtils;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rel.type.RelProtoDataType;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.commons.csv.CSVFormat;
-import org.apache.commons.csv.CSVPrinter;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-
-/**
- * Tests for {@code BeamTextCSVTable}.
- */
-public class BeamTextCSVTableTest {
-
- @Rule public TestPipeline pipeline = TestPipeline.create();
- @Rule public TestPipeline pipeline2 = TestPipeline.create();
-
- /**
- * testData.
- *
- * <p>
- * The types of the csv fields are:
- * integer,bigint,float,double,string
- * </p>
- */
- private static Object[] data1 = new Object[] { 1, 1L, 1.1F, 1.1, "james" };
- private static Object[] data2 = new Object[] { 2, 2L, 2.2F, 2.2, "bond" };
-
- private static List<Object[]> testData = Arrays.asList(data1, data2);
- private static List<BeamSqlRow> testDataRows = new ArrayList<BeamSqlRow>() {{
- for (Object[] data : testData) {
- add(buildRow(data));
- }
- }};
-
- private static Path tempFolder;
- private static File readerSourceFile;
- private static File writerTargetFile;
-
- @Test public void testBuildIOReader() {
- PCollection<BeamSqlRow> rows = new BeamTextCSVTable(buildBeamSqlRowType(),
- readerSourceFile.getAbsolutePath()).buildIOReader(pipeline);
- PAssert.that(rows).containsInAnyOrder(testDataRows);
- pipeline.run();
- }
-
- @Test public void testBuildIOWriter() {
- new BeamTextCSVTable(buildBeamSqlRowType(),
- readerSourceFile.getAbsolutePath()).buildIOReader(pipeline)
- .apply(new BeamTextCSVTable(buildBeamSqlRowType(), writerTargetFile.getAbsolutePath())
- .buildIOWriter());
- pipeline.run();
-
- PCollection<BeamSqlRow> rows = new BeamTextCSVTable(buildBeamSqlRowType(),
- writerTargetFile.getAbsolutePath()).buildIOReader(pipeline2);
-
- // confirm the two reads match
- PAssert.that(rows).containsInAnyOrder(testDataRows);
- pipeline2.run();
- }
-
- @BeforeClass public static void setUp() throws IOException {
- tempFolder = Files.createTempDirectory("BeamTextTableTest");
- readerSourceFile = writeToFile(testData, "readerSourceFile.txt");
- writerTargetFile = writeToFile(testData, "writerTargetFile.txt");
- }
-
- @AfterClass public static void teardownClass() throws IOException {
- Files.walkFileTree(tempFolder, new SimpleFileVisitor<Path>() {
-
- @Override public FileVisitResult visitFile(Path file, BasicFileAttributes attrs)
- throws IOException {
- Files.delete(file);
- return FileVisitResult.CONTINUE;
- }
-
- @Override public FileVisitResult postVisitDirectory(Path dir, IOException exc)
- throws IOException {
- Files.delete(dir);
- return FileVisitResult.CONTINUE;
- }
- });
- }
-
- private static File writeToFile(List<Object[]> rows, String filename) throws IOException {
- File file = tempFolder.resolve(filename).toFile();
- OutputStream output = new FileOutputStream(file);
- writeToStreamAndClose(rows, output);
- return file;
- }
-
- /**
- * Helper that writes the given lines (adding a newline in between) to a stream, then closes the
- * stream.
- */
- private static void writeToStreamAndClose(List<Object[]> rows, OutputStream outputStream) {
- try (PrintStream writer = new PrintStream(outputStream)) {
- CSVPrinter printer = CSVFormat.DEFAULT.print(writer);
- for (Object[] row : rows) {
- for (Object field : row) {
- printer.print(field);
- }
- printer.println();
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- private RelProtoDataType buildRowType() {
- return new RelProtoDataType() {
-
- @Override public RelDataType apply(RelDataTypeFactory a0) {
- return a0.builder().add("id", SqlTypeName.INTEGER).add("order_id", SqlTypeName.BIGINT)
- .add("price", SqlTypeName.FLOAT).add("amount", SqlTypeName.DOUBLE)
- .add("user_name", SqlTypeName.VARCHAR).build();
- }
- };
- }
-
- private static RelDataType buildRelDataType() {
- return BeamQueryPlanner.TYPE_FACTORY.builder().add("id", SqlTypeName.INTEGER)
- .add("order_id", SqlTypeName.BIGINT).add("price", SqlTypeName.FLOAT)
- .add("amount", SqlTypeName.DOUBLE).add("user_name", SqlTypeName.VARCHAR).build();
- }
-
- private static BeamSqlRowType buildBeamSqlRowType() {
- return CalciteUtils.toBeamRowType(buildRelDataType());
- }
-
- private static BeamSqlRow buildRow(Object[] data) {
- return new BeamSqlRow(buildBeamSqlRowType(), Arrays.asList(data));
- }
-}