You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/08/15 18:41:57 UTC
[2/5] beam git commit: [BEAM-2740] Hide BeamSqlEnv.
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlApiSurfaceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlApiSurfaceTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlApiSurfaceTest.java
index 08678d1..456662f 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlApiSurfaceTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlApiSurfaceTest.java
@@ -40,15 +40,13 @@ public class BeamSqlApiSurfaceTest {
final Set<String> allowed =
ImmutableSet.of(
"org.apache.beam",
- "org.joda.time",
- "org.apache.commons.csv");
+ "org.joda.time");
ApiSurface surface = ApiSurface
- .ofClass(BeamSqlCli.class)
- .includingClass(BeamSql.class)
- .includingClass(BeamSqlEnv.class)
- .includingPackage("org.apache.beam.sdk.extensions.sql.schema",
- getClass().getClassLoader())
+ .ofClass(BeamSql.class)
+ .includingClass(BeamSqlUdf.class)
+ .includingClass(BeamRecordSqlType.class)
+ .includingClass(BeamSqlRecordHelper.class)
.pruningPrefix("java")
.pruningPattern("org[.]apache[.]beam[.]sdk[.]extensions[.]sql[.].*Test")
.pruningPattern("org[.]apache[.]beam[.]sdk[.]extensions[.]sql[.].*TestBase");
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
index db562da..d99ec20 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
@@ -19,7 +19,6 @@ package org.apache.beam.sdk.extensions.sql;
import java.sql.Types;
import java.util.Arrays;
-import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PCollection;
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java
index ef75ee2..b27435c 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java
@@ -25,7 +25,6 @@ import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.Create;
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java
index 0876dd9..47109e0 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java
@@ -24,7 +24,6 @@ import static org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRelBoundedVsBo
import java.sql.Types;
import java.util.Arrays;
import org.apache.beam.sdk.coders.BeamRecordCoder;
-import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.values.BeamRecord;
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java
index 46aea99..e36eb2b 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java
@@ -19,7 +19,6 @@ package org.apache.beam.sdk.extensions.sql;
import java.sql.Types;
import java.util.Arrays;
-import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PCollection;
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
index 1541123..8db9d7a 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
@@ -20,8 +20,6 @@ package org.apache.beam.sdk.extensions.sql;
import java.sql.Types;
import java.util.Arrays;
import java.util.Iterator;
-import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdf;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.SerializableFunction;
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
index 373deb7..4a1f8a0 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
@@ -21,7 +21,6 @@ package org.apache.beam.sdk.extensions.sql;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.BeamRecord;
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java
index 97905c5..9d12126 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java
@@ -19,12 +19,12 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter;
import java.util.ArrayList;
import java.util.List;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRelDataTypeSystem;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRuleSets;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.config.Lex;
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BaseRelTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BaseRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BaseRelTest.java
new file mode 100644
index 0000000..906ccfd
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BaseRelTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * Base class for rel test.
+ */
+public class BaseRelTest {
+ public PCollection<BeamRecord> compilePipeline (
+ String sql, Pipeline pipeline, BeamSqlEnv sqlEnv) throws Exception {
+ return sqlEnv.getPlanner().compileBeamPipeline(sql, pipeline, sqlEnv);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRelTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRelTest.java
index a51cc30..8e41d0a 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRelTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRelTest.java
@@ -19,9 +19,8 @@
package org.apache.beam.sdk.extensions.sql.impl.rel;
import java.sql.Types;
-import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.TestUtils;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
@@ -34,7 +33,7 @@ import org.junit.Test;
/**
* Test for {@code BeamIntersectRel}.
*/
-public class BeamIntersectRelTest {
+public class BeamIntersectRelTest extends BaseRelTest {
static BeamSqlEnv sqlEnv = new BeamSqlEnv();
@Rule
@@ -77,7 +76,7 @@ public class BeamIntersectRelTest {
+ "SELECT order_id, site_id, price "
+ "FROM ORDER_DETAILS2 ";
- PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+ PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, sqlEnv);
PAssert.that(rows).containsInAnyOrder(
TestUtils.RowsBuilder.of(
Types.BIGINT, "order_id",
@@ -100,7 +99,7 @@ public class BeamIntersectRelTest {
+ "SELECT order_id, site_id, price "
+ "FROM ORDER_DETAILS2 ";
- PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+ PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, sqlEnv);
PAssert.that(rows).satisfies(new CheckSize(3));
PAssert.that(rows).containsInAnyOrder(
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java
index dde1540..e0d691b 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java
@@ -19,9 +19,8 @@
package org.apache.beam.sdk.extensions.sql.impl.rel;
import java.sql.Types;
-import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.TestUtils;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
@@ -34,10 +33,10 @@ import org.junit.Test;
/**
* Bounded + Bounded Test for {@code BeamJoinRel}.
*/
-public class BeamJoinRelBoundedVsBoundedTest {
+public class BeamJoinRelBoundedVsBoundedTest extends BaseRelTest {
@Rule
public final TestPipeline pipeline = TestPipeline.create();
- private static final BeamSqlEnv beamSqlEnv = new BeamSqlEnv();
+ private static final BeamSqlEnv BEAM_SQL_ENV = new BeamSqlEnv();
public static final MockedBoundedTable ORDER_DETAILS1 =
MockedBoundedTable.of(
@@ -63,8 +62,8 @@ public class BeamJoinRelBoundedVsBoundedTest {
@BeforeClass
public static void prepare() {
- beamSqlEnv.registerTable("ORDER_DETAILS1", ORDER_DETAILS1);
- beamSqlEnv.registerTable("ORDER_DETAILS2", ORDER_DETAILS2);
+ BEAM_SQL_ENV.registerTable("ORDER_DETAILS1", ORDER_DETAILS1);
+ BEAM_SQL_ENV.registerTable("ORDER_DETAILS2", ORDER_DETAILS2);
}
@Test
@@ -77,7 +76,7 @@ public class BeamJoinRelBoundedVsBoundedTest {
+ " o1.order_id=o2.site_id AND o2.price=o1.site_id"
;
- PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+ PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, BEAM_SQL_ENV);
PAssert.that(rows).containsInAnyOrder(
TestUtils.RowsBuilder.of(
Types.INTEGER, "order_id",
@@ -102,7 +101,7 @@ public class BeamJoinRelBoundedVsBoundedTest {
+ " o1.order_id=o2.site_id AND o2.price=o1.site_id"
;
- PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+ PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, BEAM_SQL_ENV);
pipeline.enableAbandonedNodeEnforcement(false);
PAssert.that(rows).containsInAnyOrder(
TestUtils.RowsBuilder.of(
@@ -130,7 +129,7 @@ public class BeamJoinRelBoundedVsBoundedTest {
+ " o1.order_id=o2.site_id AND o2.price=o1.site_id"
;
- PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+ PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, BEAM_SQL_ENV);
PAssert.that(rows).containsInAnyOrder(
TestUtils.RowsBuilder.of(
Types.INTEGER, "order_id",
@@ -157,7 +156,7 @@ public class BeamJoinRelBoundedVsBoundedTest {
+ " o1.order_id=o2.site_id AND o2.price=o1.site_id"
;
- PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+ PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, BEAM_SQL_ENV);
PAssert.that(rows).containsInAnyOrder(
TestUtils.RowsBuilder.of(
Types.INTEGER, "order_id",
@@ -187,7 +186,7 @@ public class BeamJoinRelBoundedVsBoundedTest {
;
pipeline.enableAbandonedNodeEnforcement(false);
- BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+ compilePipeline(sql, pipeline, BEAM_SQL_ENV);
pipeline.run();
}
@@ -198,7 +197,7 @@ public class BeamJoinRelBoundedVsBoundedTest {
+ "FROM ORDER_DETAILS1 o1, ORDER_DETAILS2 o2";
pipeline.enableAbandonedNodeEnforcement(false);
- BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+ compilePipeline(sql, pipeline, BEAM_SQL_ENV);
pipeline.run();
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
index 28ad99c..c5145ec 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
@@ -20,9 +20,8 @@ package org.apache.beam.sdk.extensions.sql.impl.rel;
import java.sql.Types;
import java.util.Date;
-import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.TestUtils;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.transform.BeamSqlOutputToConsoleFn;
import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
import org.apache.beam.sdk.extensions.sql.mock.MockedUnboundedTable;
@@ -39,10 +38,10 @@ import org.junit.Test;
/**
* Unbounded + Unbounded Test for {@code BeamJoinRel}.
*/
-public class BeamJoinRelUnboundedVsBoundedTest {
+public class BeamJoinRelUnboundedVsBoundedTest extends BaseRelTest {
@Rule
public final TestPipeline pipeline = TestPipeline.create();
- private static final BeamSqlEnv beamSqlEnv = new BeamSqlEnv();
+ private static final BeamSqlEnv BEAM_SQL_ENV = new BeamSqlEnv();
public static final Date FIRST_DATE = new Date(1);
public static final Date SECOND_DATE = new Date(1 + 3600 * 1000);
public static final Date THIRD_DATE = new Date(1 + 3600 * 1000 + 3600 * 1000 + 1);
@@ -50,7 +49,7 @@ public class BeamJoinRelUnboundedVsBoundedTest {
@BeforeClass
public static void prepare() {
- beamSqlEnv.registerTable("ORDER_DETAILS", MockedUnboundedTable
+ BEAM_SQL_ENV.registerTable("ORDER_DETAILS", MockedUnboundedTable
.of(
Types.INTEGER, "order_id",
Types.INTEGER, "site_id",
@@ -78,7 +77,7 @@ public class BeamJoinRelUnboundedVsBoundedTest {
)
);
- beamSqlEnv.registerTable("ORDER_DETAILS1", MockedBoundedTable
+ BEAM_SQL_ENV.registerTable("ORDER_DETAILS1", MockedBoundedTable
.of(Types.INTEGER, "order_id",
Types.VARCHAR, "buyer"
).addRows(
@@ -98,7 +97,7 @@ public class BeamJoinRelUnboundedVsBoundedTest {
+ " o1.order_id=o2.order_id"
;
- PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+ PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, BEAM_SQL_ENV);
PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
.containsInAnyOrder(
TestUtils.RowsBuilder.of(
@@ -124,7 +123,7 @@ public class BeamJoinRelUnboundedVsBoundedTest {
+ " o1.order_id=o2.order_id"
;
- PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+ PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, BEAM_SQL_ENV);
PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
.containsInAnyOrder(
TestUtils.RowsBuilder.of(
@@ -150,7 +149,7 @@ public class BeamJoinRelUnboundedVsBoundedTest {
+ " o1.order_id=o2.order_id"
;
- PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+ PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, BEAM_SQL_ENV);
rows.apply(ParDo.of(new BeamSqlOutputToConsoleFn("helloworld")));
PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
.containsInAnyOrder(
@@ -178,7 +177,7 @@ public class BeamJoinRelUnboundedVsBoundedTest {
+ " o1.order_id=o2.order_id"
;
pipeline.enableAbandonedNodeEnforcement(false);
- BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+ compilePipeline(sql, pipeline, BEAM_SQL_ENV);
pipeline.run();
}
@@ -192,7 +191,7 @@ public class BeamJoinRelUnboundedVsBoundedTest {
+ " on "
+ " o1.order_id=o2.order_id"
;
- PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+ PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, BEAM_SQL_ENV);
PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
.containsInAnyOrder(
TestUtils.RowsBuilder.of(
@@ -220,7 +219,7 @@ public class BeamJoinRelUnboundedVsBoundedTest {
;
pipeline.enableAbandonedNodeEnforcement(false);
- BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+ compilePipeline(sql, pipeline, BEAM_SQL_ENV);
pipeline.run();
}
@@ -235,7 +234,7 @@ public class BeamJoinRelUnboundedVsBoundedTest {
+ " o1.order_id=o2.order_id"
;
pipeline.enableAbandonedNodeEnforcement(false);
- BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+ compilePipeline(sql, pipeline, BEAM_SQL_ENV);
pipeline.run();
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java
index a5a2e85..e5470ca 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java
@@ -20,9 +20,8 @@ package org.apache.beam.sdk.extensions.sql.impl.rel;
import java.sql.Types;
import java.util.Date;
-import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.TestUtils;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.transform.BeamSqlOutputToConsoleFn;
import org.apache.beam.sdk.extensions.sql.mock.MockedUnboundedTable;
import org.apache.beam.sdk.testing.PAssert;
@@ -38,10 +37,10 @@ import org.junit.Test;
/**
* Unbounded + Unbounded Test for {@code BeamJoinRel}.
*/
-public class BeamJoinRelUnboundedVsUnboundedTest {
+public class BeamJoinRelUnboundedVsUnboundedTest extends BaseRelTest {
@Rule
public final TestPipeline pipeline = TestPipeline.create();
- private static final BeamSqlEnv beamSqlEnv = new BeamSqlEnv();
+ private static final BeamSqlEnv BEAM_SQL_ENV = new BeamSqlEnv();
public static final Date FIRST_DATE = new Date(1);
public static final Date SECOND_DATE = new Date(1 + 3600 * 1000);
@@ -49,7 +48,7 @@ public class BeamJoinRelUnboundedVsUnboundedTest {
@BeforeClass
public static void prepare() {
- beamSqlEnv.registerTable("ORDER_DETAILS", MockedUnboundedTable
+ BEAM_SQL_ENV.registerTable("ORDER_DETAILS", MockedUnboundedTable
.of(Types.INTEGER, "order_id",
Types.INTEGER, "site_id",
Types.INTEGER, "price",
@@ -88,7 +87,7 @@ public class BeamJoinRelUnboundedVsUnboundedTest {
+ " o1.order_id=o2.order_id"
;
- PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+ PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, BEAM_SQL_ENV);
PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
.containsInAnyOrder(
TestUtils.RowsBuilder.of(
@@ -121,7 +120,7 @@ public class BeamJoinRelUnboundedVsUnboundedTest {
// 2, 2 | 2, 5
// 3, 3 | NULL, NULL
- PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+ PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, BEAM_SQL_ENV);
PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
.containsInAnyOrder(
TestUtils.RowsBuilder.of(
@@ -151,7 +150,7 @@ public class BeamJoinRelUnboundedVsUnboundedTest {
+ " o1.order_id=o2.order_id"
;
- PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+ PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, BEAM_SQL_ENV);
PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
.containsInAnyOrder(
TestUtils.RowsBuilder.of(
@@ -181,7 +180,7 @@ public class BeamJoinRelUnboundedVsUnboundedTest {
+ " o1.order_id1=o2.order_id"
;
- PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+ PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, BEAM_SQL_ENV);
rows.apply(ParDo.of(new BeamSqlOutputToConsoleFn("hello")));
PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
.containsInAnyOrder(
@@ -213,7 +212,7 @@ public class BeamJoinRelUnboundedVsUnboundedTest {
+ " o1.order_id=o2.order_id"
;
pipeline.enableAbandonedNodeEnforcement(false);
- BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+ compilePipeline(sql, pipeline, BEAM_SQL_ENV);
pipeline.run();
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java
index 425e554..5c4ae2c 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java
@@ -19,9 +19,8 @@
package org.apache.beam.sdk.extensions.sql.impl.rel;
import java.sql.Types;
-import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.TestUtils;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
@@ -34,7 +33,7 @@ import org.junit.Test;
/**
* Test for {@code BeamMinusRel}.
*/
-public class BeamMinusRelTest {
+public class BeamMinusRelTest extends BaseRelTest {
static BeamSqlEnv sqlEnv = new BeamSqlEnv();
@Rule
@@ -78,7 +77,7 @@ public class BeamMinusRelTest {
+ "SELECT order_id, site_id, price "
+ "FROM ORDER_DETAILS2 ";
- PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+ PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, sqlEnv);
PAssert.that(rows).containsInAnyOrder(
TestUtils.RowsBuilder.of(
Types.BIGINT, "order_id",
@@ -100,7 +99,7 @@ public class BeamMinusRelTest {
+ "SELECT order_id, site_id, price "
+ "FROM ORDER_DETAILS2 ";
- PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+ PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, sqlEnv);
PAssert.that(rows).satisfies(new CheckSize(2));
PAssert.that(rows).containsInAnyOrder(
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBaseTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBaseTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBaseTest.java
index 4de493a..cd0297a 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBaseTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBaseTest.java
@@ -21,9 +21,8 @@ package org.apache.beam.sdk.extensions.sql.impl.rel;
import java.sql.Types;
import java.util.Date;
import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.TestUtils;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
@@ -38,7 +37,7 @@ import org.junit.Test;
/**
* Test for {@code BeamSetOperatorRelBase}.
*/
-public class BeamSetOperatorRelBaseTest {
+public class BeamSetOperatorRelBaseTest extends BaseRelTest {
static BeamSqlEnv sqlEnv = new BeamSqlEnv();
@Rule
@@ -71,7 +70,7 @@ public class BeamSetOperatorRelBaseTest {
+ "FROM ORDER_DETAILS GROUP BY order_id, site_id"
+ ", TUMBLE(order_time, INTERVAL '1' HOUR) ";
- PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+ PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, sqlEnv);
// compare valueInString to ignore the windowStart & windowEnd
PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
.containsInAnyOrder(
@@ -100,7 +99,7 @@ public class BeamSetOperatorRelBaseTest {
// use a real pipeline rather than the TestPipeline because we are
// testing exceptions, the pipeline will not actually run.
Pipeline pipeline1 = Pipeline.create(PipelineOptionsFactory.create());
- BeamSqlCli.compilePipeline(sql, pipeline1, sqlEnv);
+ compilePipeline(sql, pipeline1, sqlEnv);
pipeline.run();
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java
index f033fa0..19ba0d0 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java
@@ -20,9 +20,8 @@ package org.apache.beam.sdk.extensions.sql.impl.rel;
import java.sql.Types;
import java.util.Date;
-import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.TestUtils;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
@@ -35,7 +34,7 @@ import org.junit.Test;
/**
* Test for {@code BeamSortRel}.
*/
-public class BeamSortRelTest {
+public class BeamSortRelTest extends BaseRelTest {
static BeamSqlEnv sqlEnv = new BeamSqlEnv();
@Rule
@@ -78,7 +77,7 @@ public class BeamSortRelTest {
+ "FROM ORDER_DETAILS "
+ "ORDER BY order_id asc, site_id desc limit 4";
- PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+ PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, sqlEnv);
PAssert.that(rows).containsInAnyOrder(TestUtils.RowsBuilder.of(
Types.BIGINT, "order_id",
Types.INTEGER, "site_id",
@@ -117,7 +116,7 @@ public class BeamSortRelTest {
+ "FROM ORDER_DETAILS "
+ "ORDER BY order_id asc, site_id desc NULLS FIRST limit 4";
- PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+ PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, sqlEnv);
PAssert.that(rows).containsInAnyOrder(
TestUtils.RowsBuilder.of(
Types.BIGINT, "order_id",
@@ -155,7 +154,7 @@ public class BeamSortRelTest {
+ "FROM ORDER_DETAILS "
+ "ORDER BY order_id asc, site_id desc NULLS LAST limit 4";
- PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+ PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, sqlEnv);
PAssert.that(rows).containsInAnyOrder(
TestUtils.RowsBuilder.of(
Types.BIGINT, "order_id",
@@ -178,7 +177,7 @@ public class BeamSortRelTest {
+ "FROM ORDER_DETAILS "
+ "ORDER BY order_id asc, site_id desc limit 4 offset 4";
- PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+ PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, sqlEnv);
PAssert.that(rows).containsInAnyOrder(
TestUtils.RowsBuilder.of(
Types.BIGINT, "order_id",
@@ -201,7 +200,7 @@ public class BeamSortRelTest {
+ "FROM ORDER_DETAILS "
+ "ORDER BY order_id asc, site_id desc limit 11";
- PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+ PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, sqlEnv);
PAssert.that(rows).containsInAnyOrder(
TestUtils.RowsBuilder.of(
Types.BIGINT, "order_id",
@@ -232,6 +231,6 @@ public class BeamSortRelTest {
+ "ORDER BY order_id asc limit 11";
TestPipeline pipeline = TestPipeline.create();
- BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+ compilePipeline(sql, pipeline, sqlEnv);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRelTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRelTest.java
index 7cc52da..d79a54e 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRelTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRelTest.java
@@ -19,9 +19,8 @@
package org.apache.beam.sdk.extensions.sql.impl.rel;
import java.sql.Types;
-import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.TestUtils;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
@@ -34,7 +33,7 @@ import org.junit.Test;
/**
* Test for {@code BeamUnionRel}.
*/
-public class BeamUnionRelTest {
+public class BeamUnionRelTest extends BaseRelTest {
static BeamSqlEnv sqlEnv = new BeamSqlEnv();
@Rule
@@ -63,7 +62,7 @@ public class BeamUnionRelTest {
+ " order_id, site_id, price "
+ "FROM ORDER_DETAILS ";
- PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+ PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, sqlEnv);
PAssert.that(rows).containsInAnyOrder(
TestUtils.RowsBuilder.of(
Types.BIGINT, "order_id",
@@ -86,7 +85,7 @@ public class BeamUnionRelTest {
+ " SELECT order_id, site_id, price "
+ "FROM ORDER_DETAILS";
- PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+ PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, sqlEnv);
PAssert.that(rows).containsInAnyOrder(
TestUtils.RowsBuilder.of(
Types.BIGINT, "order_id",
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRelTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRelTest.java
index ff31e55..5604e32 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRelTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRelTest.java
@@ -19,9 +19,8 @@
package org.apache.beam.sdk.extensions.sql.impl.rel;
import java.sql.Types;
-import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.TestUtils;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
@@ -34,7 +33,7 @@ import org.junit.Test;
/**
* Test for {@code BeamValuesRel}.
*/
-public class BeamValuesRelTest {
+public class BeamValuesRelTest extends BaseRelTest {
static BeamSqlEnv sqlEnv = new BeamSqlEnv();
@Rule
@@ -60,7 +59,7 @@ public class BeamValuesRelTest {
public void testValues() throws Exception {
String sql = "insert into string_table(name, description) values "
+ "('hello', 'world'), ('james', 'bond')";
- PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+ PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, sqlEnv);
PAssert.that(rows).containsInAnyOrder(
TestUtils.RowsBuilder.of(
Types.VARCHAR, "name",
@@ -76,7 +75,7 @@ public class BeamValuesRelTest {
@Test
public void testValues_castInt() throws Exception {
String sql = "insert into int_table (c0, c1) values(cast(1 as int), cast(2 as int))";
- PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+ PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, sqlEnv);
PAssert.that(rows).containsInAnyOrder(
TestUtils.RowsBuilder.of(
Types.INTEGER, "c0",
@@ -91,7 +90,7 @@ public class BeamValuesRelTest {
@Test
public void testValues_onlySelect() throws Exception {
String sql = "select 1, '1'";
- PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+ PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, sqlEnv);
PAssert.that(rows).containsInAnyOrder(
TestUtils.RowsBuilder.of(
Types.INTEGER, "EXPR$0",
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamSqlRowCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamSqlRowCoderTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamSqlRowCoderTest.java
new file mode 100644
index 0000000..0a320db
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamSqlRowCoderTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.schema;
+
+import java.math.BigDecimal;
+import java.util.Date;
+import java.util.GregorianCalendar;
+import org.apache.beam.sdk.coders.BeamRecordCoder;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Test;
+
+/**
+ * Tests for BeamSqlRowCoder.
+ */
+public class BeamSqlRowCoderTest {
+
+ @Test
+ public void encodeAndDecode() throws Exception {
+ final RelProtoDataType protoRowType = new RelProtoDataType() {
+ @Override
+ public RelDataType apply(RelDataTypeFactory a0) {
+ return a0.builder()
+ .add("col_tinyint", SqlTypeName.TINYINT)
+ .add("col_smallint", SqlTypeName.SMALLINT)
+ .add("col_integer", SqlTypeName.INTEGER)
+ .add("col_bigint", SqlTypeName.BIGINT)
+ .add("col_float", SqlTypeName.FLOAT)
+ .add("col_double", SqlTypeName.DOUBLE)
+ .add("col_decimal", SqlTypeName.DECIMAL)
+ .add("col_string_varchar", SqlTypeName.VARCHAR)
+ .add("col_time", SqlTypeName.TIME)
+ .add("col_timestamp", SqlTypeName.TIMESTAMP)
+ .add("col_boolean", SqlTypeName.BOOLEAN)
+ .build();
+ }
+ };
+
+ BeamRecordSqlType beamSQLRowType = CalciteUtils.toBeamRowType(
+ protoRowType.apply(new JavaTypeFactoryImpl(
+ RelDataTypeSystem.DEFAULT)));
+
+ GregorianCalendar calendar = new GregorianCalendar();
+ calendar.setTime(new Date());
+ BeamRecord row = new BeamRecord(beamSQLRowType
+ , Byte.valueOf("1"), Short.valueOf("1"), 1, 1L, 1.1F, 1.1
+ , BigDecimal.ZERO, "hello", calendar, new Date(), true);
+
+
+ BeamRecordCoder coder = beamSQLRowType.getRecordCoder();
+ CoderProperties.coderDecodeEncodeEqual(coder, row);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/kafka/BeamKafkaCSVTableTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/kafka/BeamKafkaCSVTableTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/kafka/BeamKafkaCSVTableTest.java
new file mode 100644
index 0000000..fd88448
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/kafka/BeamKafkaCSVTableTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.schema.kafka;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
+import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.commons.csv.CSVFormat;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * Test for BeamKafkaCSVTable.
+ */
+public class BeamKafkaCSVTableTest {
+ @Rule
+ public TestPipeline pipeline = TestPipeline.create();
+ public static BeamRecord row1;
+ public static BeamRecord row2;
+
+ @BeforeClass
+ public static void setUp() {
+ row1 = new BeamRecord(genRowType(), 1L, 1, 1.0);
+
+ row2 = new BeamRecord(genRowType(), 2L, 2, 2.0);
+ }
+
+ @Test public void testCsvRecorderDecoder() throws Exception {
+ PCollection<BeamRecord> result = pipeline
+ .apply(
+ Create.of("1,\"1\",1.0", "2,2,2.0")
+ )
+ .apply(ParDo.of(new String2KvBytes()))
+ .apply(
+ new BeamKafkaCSVTable.CsvRecorderDecoder(genRowType(), CSVFormat.DEFAULT)
+ );
+
+ PAssert.that(result).containsInAnyOrder(row1, row2);
+
+ pipeline.run();
+ }
+
+ @Test public void testCsvRecorderEncoder() throws Exception {
+ PCollection<BeamRecord> result = pipeline
+ .apply(
+ Create.of(row1, row2)
+ )
+ .apply(
+ new BeamKafkaCSVTable.CsvRecorderEncoder(genRowType(), CSVFormat.DEFAULT)
+ ).apply(
+ new BeamKafkaCSVTable.CsvRecorderDecoder(genRowType(), CSVFormat.DEFAULT)
+ );
+
+ PAssert.that(result).containsInAnyOrder(row1, row2);
+
+ pipeline.run();
+ }
+
+ private static BeamRecordSqlType genRowType() {
+ return CalciteUtils.toBeamRowType(new RelProtoDataType() {
+
+ @Override public RelDataType apply(RelDataTypeFactory a0) {
+ return a0.builder().add("order_id", SqlTypeName.BIGINT)
+ .add("site_id", SqlTypeName.INTEGER)
+ .add("price", SqlTypeName.DOUBLE).build();
+ }
+ }.apply(BeamQueryPlanner.TYPE_FACTORY));
+ }
+
+ private static class String2KvBytes extends DoFn<String, KV<byte[], byte[]>>
+ implements Serializable {
+ @ProcessElement
+ public void processElement(ProcessContext ctx) {
+ ctx.output(KV.of(new byte[] {}, ctx.element().getBytes()));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/BeamTextCSVTableTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/BeamTextCSVTableTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/BeamTextCSVTableTest.java
new file mode 100644
index 0000000..9a57a5f
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/BeamTextCSVTableTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.schema.text;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
+import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVPrinter;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * Tests for {@code BeamTextCSVTable}.
+ */
+public class BeamTextCSVTableTest {
+
+ @Rule public TestPipeline pipeline = TestPipeline.create();
+ @Rule public TestPipeline pipeline2 = TestPipeline.create();
+
+ /**
+ * testData.
+ *
+ * <p>
+ * The types of the csv fields are:
+ * integer,bigint,float,double,string
+ * </p>
+ */
+ private static Object[] data1 = new Object[] { 1, 1L, 1.1F, 1.1, "james" };
+ private static Object[] data2 = new Object[] { 2, 2L, 2.2F, 2.2, "bond" };
+
+ private static List<Object[]> testData = Arrays.asList(data1, data2);
+ private static List<BeamRecord> testDataRows = new ArrayList<BeamRecord>() {{
+ for (Object[] data : testData) {
+ add(buildRow(data));
+ }
+ }};
+
+ private static Path tempFolder;
+ private static File readerSourceFile;
+ private static File writerTargetFile;
+
+ @Test public void testBuildIOReader() {
+ PCollection<BeamRecord> rows = new BeamTextCSVTable(buildBeamSqlRowType(),
+ readerSourceFile.getAbsolutePath()).buildIOReader(pipeline);
+ PAssert.that(rows).containsInAnyOrder(testDataRows);
+ pipeline.run();
+ }
+
+ @Test public void testBuildIOWriter() {
+ new BeamTextCSVTable(buildBeamSqlRowType(),
+ readerSourceFile.getAbsolutePath()).buildIOReader(pipeline)
+ .apply(new BeamTextCSVTable(buildBeamSqlRowType(), writerTargetFile.getAbsolutePath())
+ .buildIOWriter());
+ pipeline.run();
+
+ PCollection<BeamRecord> rows = new BeamTextCSVTable(buildBeamSqlRowType(),
+ writerTargetFile.getAbsolutePath()).buildIOReader(pipeline2);
+
+ // confirm the two reads match
+ PAssert.that(rows).containsInAnyOrder(testDataRows);
+ pipeline2.run();
+ }
+
+ @BeforeClass public static void setUp() throws IOException {
+ tempFolder = Files.createTempDirectory("BeamTextTableTest");
+ readerSourceFile = writeToFile(testData, "readerSourceFile.txt");
+ writerTargetFile = writeToFile(testData, "writerTargetFile.txt");
+ }
+
+ @AfterClass public static void teardownClass() throws IOException {
+ Files.walkFileTree(tempFolder, new SimpleFileVisitor<Path>() {
+
+ @Override public FileVisitResult visitFile(Path file, BasicFileAttributes attrs)
+ throws IOException {
+ Files.delete(file);
+ return FileVisitResult.CONTINUE;
+ }
+
+ @Override public FileVisitResult postVisitDirectory(Path dir, IOException exc)
+ throws IOException {
+ Files.delete(dir);
+ return FileVisitResult.CONTINUE;
+ }
+ });
+ }
+
+ private static File writeToFile(List<Object[]> rows, String filename) throws IOException {
+ File file = tempFolder.resolve(filename).toFile();
+ OutputStream output = new FileOutputStream(file);
+ writeToStreamAndClose(rows, output);
+ return file;
+ }
+
+ /**
+ * Helper that writes the given lines (adding a newline in between) to a stream, then closes the
+ * stream.
+ */
+ private static void writeToStreamAndClose(List<Object[]> rows, OutputStream outputStream) {
+ try (PrintStream writer = new PrintStream(outputStream)) {
+ CSVPrinter printer = CSVFormat.DEFAULT.print(writer);
+ for (Object[] row : rows) {
+ for (Object field : row) {
+ printer.print(field);
+ }
+ printer.println();
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ private RelProtoDataType buildRowType() {
+ return new RelProtoDataType() {
+
+ @Override public RelDataType apply(RelDataTypeFactory a0) {
+ return a0.builder().add("id", SqlTypeName.INTEGER).add("order_id", SqlTypeName.BIGINT)
+ .add("price", SqlTypeName.FLOAT).add("amount", SqlTypeName.DOUBLE)
+ .add("user_name", SqlTypeName.VARCHAR).build();
+ }
+ };
+ }
+
+ private static RelDataType buildRelDataType() {
+ return BeamQueryPlanner.TYPE_FACTORY.builder().add("id", SqlTypeName.INTEGER)
+ .add("order_id", SqlTypeName.BIGINT).add("price", SqlTypeName.FLOAT)
+ .add("amount", SqlTypeName.DOUBLE).add("user_name", SqlTypeName.VARCHAR).build();
+ }
+
+ private static BeamRecordSqlType buildBeamSqlRowType() {
+ return CalciteUtils.toBeamRowType(buildRelDataType());
+ }
+
+ private static BeamRecord buildRow(Object[] data) {
+ return new BeamRecord(buildBeamSqlRowType(), Arrays.asList(data));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/transform/BeamAggregationTransformTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/transform/BeamAggregationTransformTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/transform/BeamAggregationTransformTest.java
new file mode 100644
index 0000000..948e86c
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/transform/BeamAggregationTransformTest.java
@@ -0,0 +1,453 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.schema.transform;
+
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.coders.BeamRecordCoder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
+import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner;
+import org.apache.beam.sdk.extensions.sql.impl.transform.BeamAggregationTransforms;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.type.RelDataTypeFactory.FieldInfoBuilder;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.fun.SqlAvgAggFunction;
+import org.apache.calcite.sql.fun.SqlCountAggFunction;
+import org.apache.calcite.sql.fun.SqlMinMaxAggFunction;
+import org.apache.calcite.sql.fun.SqlSumAggFunction;
+import org.apache.calcite.sql.type.BasicSqlType;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * Unit tests for {@link BeamAggregationTransforms}.
+ *
+ */
+public class BeamAggregationTransformTest extends BeamTransformBaseTest{
+
+ @Rule
+ public TestPipeline p = TestPipeline.create();
+
+ private List<AggregateCall> aggCalls;
+
+ private BeamRecordSqlType keyType;
+ private BeamRecordSqlType aggPartType;
+ private BeamRecordSqlType outputType;
+
+ private BeamRecordCoder inRecordCoder;
+ private BeamRecordCoder keyCoder;
+ private BeamRecordCoder aggCoder;
+ private BeamRecordCoder outRecordCoder;
+
+ /**
+ * This step equals to below query.
+ * <pre>
+ * SELECT `f_int`
+ * , COUNT(*) AS `size`
+ * , SUM(`f_long`) AS `sum1`, AVG(`f_long`) AS `avg1`
+ * , MAX(`f_long`) AS `max1`, MIN(`f_long`) AS `min1`
+ * , SUM(`f_short`) AS `sum2`, AVG(`f_short`) AS `avg2`
+ * , MAX(`f_short`) AS `max2`, MIN(`f_short`) AS `min2`
+ * , SUM(`f_byte`) AS `sum3`, AVG(`f_byte`) AS `avg3`
+ * , MAX(`f_byte`) AS `max3`, MIN(`f_byte`) AS `min3`
+ * , SUM(`f_float`) AS `sum4`, AVG(`f_float`) AS `avg4`
+ * , MAX(`f_float`) AS `max4`, MIN(`f_float`) AS `min4`
+ * , SUM(`f_double`) AS `sum5`, AVG(`f_double`) AS `avg5`
+ * , MAX(`f_double`) AS `max5`, MIN(`f_double`) AS `min5`
+ * , MAX(`f_timestamp`) AS `max7`, MIN(`f_timestamp`) AS `min7`
+ * ,SUM(`f_int2`) AS `sum8`, AVG(`f_int2`) AS `avg8`
+ * , MAX(`f_int2`) AS `max8`, MIN(`f_int2`) AS `min8`
+ * FROM TABLE_NAME
+ * GROUP BY `f_int`
+ * </pre>
+ * @throws ParseException
+ */
+ @Test
+ public void testCountPerElementBasic() throws ParseException {
+ setupEnvironment();
+
+ PCollection<BeamRecord> input = p.apply(Create.of(inputRows));
+
+ //1. extract fields in group-by key part
+ PCollection<KV<BeamRecord, BeamRecord>> exGroupByStream = input.apply("exGroupBy",
+ WithKeys
+ .of(new BeamAggregationTransforms.AggregationGroupByKeyFn(-1, ImmutableBitSet.of(0))))
+ .setCoder(KvCoder.<BeamRecord, BeamRecord>of(keyCoder, inRecordCoder));
+
+ //2. apply a GroupByKey.
+ PCollection<KV<BeamRecord, Iterable<BeamRecord>>> groupedStream = exGroupByStream
+ .apply("groupBy", GroupByKey.<BeamRecord, BeamRecord>create())
+ .setCoder(KvCoder.<BeamRecord, Iterable<BeamRecord>>of(keyCoder,
+ IterableCoder.<BeamRecord>of(inRecordCoder)));
+
+ //3. run aggregation functions
+ PCollection<KV<BeamRecord, BeamRecord>> aggregatedStream = groupedStream.apply("aggregation",
+ Combine.<BeamRecord, BeamRecord, BeamRecord>groupedValues(
+ new BeamAggregationTransforms.AggregationAdaptor(aggCalls, inputRowType)))
+ .setCoder(KvCoder.<BeamRecord, BeamRecord>of(keyCoder, aggCoder));
+
+ //4. flat KV to a single record
+ PCollection<BeamRecord> mergedStream = aggregatedStream.apply("mergeRecord",
+ ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord(outputType, aggCalls, -1)));
+ mergedStream.setCoder(outRecordCoder);
+
+ //assert function BeamAggregationTransform.AggregationGroupByKeyFn
+ PAssert.that(exGroupByStream).containsInAnyOrder(prepareResultOfAggregationGroupByKeyFn());
+
+ //assert BeamAggregationTransform.AggregationCombineFn
+ PAssert.that(aggregatedStream).containsInAnyOrder(prepareResultOfAggregationCombineFn());
+
+ //assert BeamAggregationTransform.MergeAggregationRecord
+ PAssert.that(mergedStream).containsInAnyOrder(prepareResultOfMergeAggregationRecord());
+
+ p.run();
+}
+
+ private void setupEnvironment() {
+ prepareAggregationCalls();
+ prepareTypeAndCoder();
+ }
+
+ /**
+ * create list of all {@link AggregateCall}.
+ */
+ @SuppressWarnings("deprecation")
+ private void prepareAggregationCalls() {
+ //aggregations for all data type
+ aggCalls = new ArrayList<>();
+ aggCalls.add(
+ new AggregateCall(new SqlCountAggFunction(), false,
+ Arrays.<Integer>asList(),
+ new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT),
+ "count")
+ );
+ aggCalls.add(
+ new AggregateCall(new SqlSumAggFunction(
+ new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT)), false,
+ Arrays.<Integer>asList(1),
+ new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT),
+ "sum1")
+ );
+ aggCalls.add(
+ new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false,
+ Arrays.<Integer>asList(1),
+ new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT),
+ "avg1")
+ );
+ aggCalls.add(
+ new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false,
+ Arrays.<Integer>asList(1),
+ new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT),
+ "max1")
+ );
+ aggCalls.add(
+ new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false,
+ Arrays.<Integer>asList(1),
+ new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT),
+ "min1")
+ );
+
+ aggCalls.add(
+ new AggregateCall(new SqlSumAggFunction(
+ new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.SMALLINT)), false,
+ Arrays.<Integer>asList(2),
+ new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.SMALLINT),
+ "sum2")
+ );
+ aggCalls.add(
+ new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false,
+ Arrays.<Integer>asList(2),
+ new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.SMALLINT),
+ "avg2")
+ );
+ aggCalls.add(
+ new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false,
+ Arrays.<Integer>asList(2),
+ new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.SMALLINT),
+ "max2")
+ );
+ aggCalls.add(
+ new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false,
+ Arrays.<Integer>asList(2),
+ new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.SMALLINT),
+ "min2")
+ );
+
+ aggCalls.add(
+ new AggregateCall(
+ new SqlSumAggFunction(new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TINYINT)),
+ false,
+ Arrays.<Integer>asList(3),
+ new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TINYINT),
+ "sum3")
+ );
+ aggCalls.add(
+ new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false,
+ Arrays.<Integer>asList(3),
+ new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TINYINT),
+ "avg3")
+ );
+ aggCalls.add(
+ new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false,
+ Arrays.<Integer>asList(3),
+ new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TINYINT),
+ "max3")
+ );
+ aggCalls.add(
+ new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false,
+ Arrays.<Integer>asList(3),
+ new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TINYINT),
+ "min3")
+ );
+
+ aggCalls.add(
+ new AggregateCall(
+ new SqlSumAggFunction(new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.FLOAT)),
+ false,
+ Arrays.<Integer>asList(4),
+ new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.FLOAT),
+ "sum4")
+ );
+ aggCalls.add(
+ new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false,
+ Arrays.<Integer>asList(4),
+ new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.FLOAT),
+ "avg4")
+ );
+ aggCalls.add(
+ new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false,
+ Arrays.<Integer>asList(4),
+ new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.FLOAT),
+ "max4")
+ );
+ aggCalls.add(
+ new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false,
+ Arrays.<Integer>asList(4),
+ new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.FLOAT),
+ "min4")
+ );
+
+ aggCalls.add(
+ new AggregateCall(
+ new SqlSumAggFunction(new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.DOUBLE)),
+ false,
+ Arrays.<Integer>asList(5),
+ new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.DOUBLE),
+ "sum5")
+ );
+ aggCalls.add(
+ new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false,
+ Arrays.<Integer>asList(5),
+ new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.DOUBLE),
+ "avg5")
+ );
+ aggCalls.add(
+ new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false,
+ Arrays.<Integer>asList(5),
+ new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.DOUBLE),
+ "max5")
+ );
+ aggCalls.add(
+ new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false,
+ Arrays.<Integer>asList(5),
+ new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.DOUBLE),
+ "min5")
+ );
+
+ aggCalls.add(
+ new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false,
+ Arrays.<Integer>asList(7),
+ new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TIMESTAMP),
+ "max7")
+ );
+ aggCalls.add(
+ new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false,
+ Arrays.<Integer>asList(7),
+ new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TIMESTAMP),
+ "min7")
+ );
+
+ aggCalls.add(
+ new AggregateCall(
+ new SqlSumAggFunction(new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.INTEGER)),
+ false,
+ Arrays.<Integer>asList(8),
+ new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.INTEGER),
+ "sum8")
+ );
+ aggCalls.add(
+ new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false,
+ Arrays.<Integer>asList(8),
+ new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.INTEGER),
+ "avg8")
+ );
+ aggCalls.add(
+ new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false,
+ Arrays.<Integer>asList(8),
+ new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.INTEGER),
+ "max8")
+ );
+ aggCalls.add(
+ new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false,
+ Arrays.<Integer>asList(8),
+ new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.INTEGER),
+ "min8")
+ );
+ }
+
+ /**
+ * Coders used in aggregation steps.
+ */
+ private void prepareTypeAndCoder() {
+ inRecordCoder = inputRowType.getRecordCoder();
+
+ keyType = initTypeOfSqlRow(Arrays.asList(KV.of("f_int", SqlTypeName.INTEGER)));
+ keyCoder = keyType.getRecordCoder();
+
+ aggPartType = initTypeOfSqlRow(
+ Arrays.asList(KV.of("count", SqlTypeName.BIGINT),
+
+ KV.of("sum1", SqlTypeName.BIGINT), KV.of("avg1", SqlTypeName.BIGINT),
+ KV.of("max1", SqlTypeName.BIGINT), KV.of("min1", SqlTypeName.BIGINT),
+
+ KV.of("sum2", SqlTypeName.SMALLINT), KV.of("avg2", SqlTypeName.SMALLINT),
+ KV.of("max2", SqlTypeName.SMALLINT), KV.of("min2", SqlTypeName.SMALLINT),
+
+ KV.of("sum3", SqlTypeName.TINYINT), KV.of("avg3", SqlTypeName.TINYINT),
+ KV.of("max3", SqlTypeName.TINYINT), KV.of("min3", SqlTypeName.TINYINT),
+
+ KV.of("sum4", SqlTypeName.FLOAT), KV.of("avg4", SqlTypeName.FLOAT),
+ KV.of("max4", SqlTypeName.FLOAT), KV.of("min4", SqlTypeName.FLOAT),
+
+ KV.of("sum5", SqlTypeName.DOUBLE), KV.of("avg5", SqlTypeName.DOUBLE),
+ KV.of("max5", SqlTypeName.DOUBLE), KV.of("min5", SqlTypeName.DOUBLE),
+
+ KV.of("max7", SqlTypeName.TIMESTAMP), KV.of("min7", SqlTypeName.TIMESTAMP),
+
+ KV.of("sum8", SqlTypeName.INTEGER), KV.of("avg8", SqlTypeName.INTEGER),
+ KV.of("max8", SqlTypeName.INTEGER), KV.of("min8", SqlTypeName.INTEGER)
+ ));
+ aggCoder = aggPartType.getRecordCoder();
+
+ outputType = prepareFinalRowType();
+ outRecordCoder = outputType.getRecordCoder();
+ }
+
+ /**
+ * expected results after {@link BeamAggregationTransforms.AggregationGroupByKeyFn}.
+ */
+ private List<KV<BeamRecord, BeamRecord>> prepareResultOfAggregationGroupByKeyFn() {
+ return Arrays.asList(
+ KV.of(new BeamRecord(keyType, Arrays.<Object>asList(inputRows.get(0).getInteger(0))),
+ inputRows.get(0)),
+ KV.of(new BeamRecord(keyType, Arrays.<Object>asList(inputRows.get(1).getInteger(0))),
+ inputRows.get(1)),
+ KV.of(new BeamRecord(keyType, Arrays.<Object>asList(inputRows.get(2).getInteger(0))),
+ inputRows.get(2)),
+ KV.of(new BeamRecord(keyType, Arrays.<Object>asList(inputRows.get(3).getInteger(0))),
+ inputRows.get(3)));
+ }
+
+ /**
+ * expected results after {@link BeamAggregationTransforms.AggregationCombineFn}.
+ */
+ private List<KV<BeamRecord, BeamRecord>> prepareResultOfAggregationCombineFn()
+ throws ParseException {
+ return Arrays.asList(
+ KV.of(new BeamRecord(keyType, Arrays.<Object>asList(inputRows.get(0).getInteger(0))),
+ new BeamRecord(aggPartType, Arrays.<Object>asList(
+ 4L,
+ 10000L, 2500L, 4000L, 1000L,
+ (short) 10, (short) 2, (short) 4, (short) 1,
+ (byte) 10, (byte) 2, (byte) 4, (byte) 1,
+ 10.0F, 2.5F, 4.0F, 1.0F,
+ 10.0, 2.5, 4.0, 1.0,
+ format.parse("2017-01-01 02:04:03"), format.parse("2017-01-01 01:01:03"),
+ 10, 2, 4, 1
+ )))
+ );
+ }
+
+ /**
+ * Row type of final output row.
+ */
+ private BeamRecordSqlType prepareFinalRowType() {
+ FieldInfoBuilder builder = BeamQueryPlanner.TYPE_FACTORY.builder();
+ List<KV<String, SqlTypeName>> columnMetadata =
+ Arrays.asList(KV.of("f_int", SqlTypeName.INTEGER), KV.of("count", SqlTypeName.BIGINT),
+
+ KV.of("sum1", SqlTypeName.BIGINT), KV.of("avg1", SqlTypeName.BIGINT),
+ KV.of("max1", SqlTypeName.BIGINT), KV.of("min1", SqlTypeName.BIGINT),
+
+ KV.of("sum2", SqlTypeName.SMALLINT), KV.of("avg2", SqlTypeName.SMALLINT),
+ KV.of("max2", SqlTypeName.SMALLINT), KV.of("min2", SqlTypeName.SMALLINT),
+
+ KV.of("sum3", SqlTypeName.TINYINT), KV.of("avg3", SqlTypeName.TINYINT),
+ KV.of("max3", SqlTypeName.TINYINT), KV.of("min3", SqlTypeName.TINYINT),
+
+ KV.of("sum4", SqlTypeName.FLOAT), KV.of("avg4", SqlTypeName.FLOAT),
+ KV.of("max4", SqlTypeName.FLOAT), KV.of("min4", SqlTypeName.FLOAT),
+
+ KV.of("sum5", SqlTypeName.DOUBLE), KV.of("avg5", SqlTypeName.DOUBLE),
+ KV.of("max5", SqlTypeName.DOUBLE), KV.of("min5", SqlTypeName.DOUBLE),
+
+ KV.of("max7", SqlTypeName.TIMESTAMP), KV.of("min7", SqlTypeName.TIMESTAMP),
+
+ KV.of("sum8", SqlTypeName.INTEGER), KV.of("avg8", SqlTypeName.INTEGER),
+ KV.of("max8", SqlTypeName.INTEGER), KV.of("min8", SqlTypeName.INTEGER)
+ );
+ for (KV<String, SqlTypeName> cm : columnMetadata) {
+ builder.add(cm.getKey(), cm.getValue());
+ }
+ return CalciteUtils.toBeamRowType(builder.build());
+ }
+
+ /**
+ * expected results after {@link BeamAggregationTransforms.MergeAggregationRecord}.
+ */
+ private BeamRecord prepareResultOfMergeAggregationRecord() throws ParseException {
+ return new BeamRecord(outputType, Arrays.<Object>asList(
+ 1, 4L,
+ 10000L, 2500L, 4000L, 1000L,
+ (short) 10, (short) 2, (short) 4, (short) 1,
+ (byte) 10, (byte) 2, (byte) 4, (byte) 1,
+ 10.0F, 2.5F, 4.0F, 1.0F,
+ 10.0, 2.5, 4.0, 1.0,
+ format.parse("2017-01-01 02:04:03"), format.parse("2017-01-01 01:01:03"),
+ 10, 2, 4, 1
+ ));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/transform/BeamTransformBaseTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/transform/BeamTransformBaseTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/transform/BeamTransformBaseTest.java
new file mode 100644
index 0000000..3c8f040
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/transform/BeamTransformBaseTest.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.schema.transform;
+
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
+import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.beam.sdk.values.KV;
+import org.apache.calcite.rel.type.RelDataTypeFactory.FieldInfoBuilder;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.BeforeClass;
+
+/**
+ * shared methods to test PTransforms which execute Beam SQL steps.
+ *
+ */
+public class BeamTransformBaseTest {
+ public static DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+ public static BeamRecordSqlType inputRowType;
+ public static List<BeamRecord> inputRows;
+
+ @BeforeClass
+ public static void prepareInput() throws NumberFormatException, ParseException{
+ List<KV<String, SqlTypeName>> columnMetadata = Arrays.asList(
+ KV.of("f_int", SqlTypeName.INTEGER), KV.of("f_long", SqlTypeName.BIGINT),
+ KV.of("f_short", SqlTypeName.SMALLINT), KV.of("f_byte", SqlTypeName.TINYINT),
+ KV.of("f_float", SqlTypeName.FLOAT), KV.of("f_double", SqlTypeName.DOUBLE),
+ KV.of("f_string", SqlTypeName.VARCHAR), KV.of("f_timestamp", SqlTypeName.TIMESTAMP),
+ KV.of("f_int2", SqlTypeName.INTEGER)
+ );
+ inputRowType = initTypeOfSqlRow(columnMetadata);
+ inputRows = Arrays.asList(
+ initBeamSqlRow(columnMetadata,
+ Arrays.<Object>asList(1, 1000L, Short.valueOf("1"), Byte.valueOf("1"), 1.0F, 1.0,
+ "string_row1", format.parse("2017-01-01 01:01:03"), 1)),
+ initBeamSqlRow(columnMetadata,
+ Arrays.<Object>asList(1, 2000L, Short.valueOf("2"), Byte.valueOf("2"), 2.0F, 2.0,
+ "string_row2", format.parse("2017-01-01 01:02:03"), 2)),
+ initBeamSqlRow(columnMetadata,
+ Arrays.<Object>asList(1, 3000L, Short.valueOf("3"), Byte.valueOf("3"), 3.0F, 3.0,
+ "string_row3", format.parse("2017-01-01 01:03:03"), 3)),
+ initBeamSqlRow(columnMetadata, Arrays.<Object>asList(1, 4000L, Short.valueOf("4"),
+ Byte.valueOf("4"), 4.0F, 4.0, "string_row4", format.parse("2017-01-01 02:04:03"), 4)));
+ }
+
+ /**
+ * create a {@code BeamSqlRowType} for given column metadata.
+ */
+ public static BeamRecordSqlType initTypeOfSqlRow(List<KV<String, SqlTypeName>> columnMetadata){
+ FieldInfoBuilder builder = BeamQueryPlanner.TYPE_FACTORY.builder();
+ for (KV<String, SqlTypeName> cm : columnMetadata) {
+ builder.add(cm.getKey(), cm.getValue());
+ }
+ return CalciteUtils.toBeamRowType(builder.build());
+ }
+
+ /**
+ * Create an empty row with given column metadata.
+ */
+ public static BeamRecord initBeamSqlRow(List<KV<String, SqlTypeName>> columnMetadata) {
+ return initBeamSqlRow(columnMetadata, Arrays.asList());
+ }
+
+ /**
+ * Create a row with given column metadata, and values for each column.
+ *
+ */
+ public static BeamRecord initBeamSqlRow(List<KV<String, SqlTypeName>> columnMetadata,
+ List<Object> rowValues){
+ BeamRecordSqlType rowType = initTypeOfSqlRow(columnMetadata);
+
+ return new BeamRecord(rowType, rowValues);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
index 5898e2e..a64afa6 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
@@ -29,10 +29,10 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
import org.apache.beam.sdk.extensions.sql.BeamSql;
import org.apache.beam.sdk.extensions.sql.TestUtils;
import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.values.BeamRecord;
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java
index 4ce2f45..a836f79 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java
@@ -21,8 +21,8 @@ package org.apache.beam.sdk.extensions.sql.integrationtest;
import java.math.BigDecimal;
import java.sql.Types;
import java.util.Arrays;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PCollection;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java
index 60e8211..cf66268 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java
@@ -25,8 +25,8 @@ import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.extensions.sql.schema.BeamIOType;
-import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
+import org.apache.beam.sdk.extensions.sql.impl.schema.BeamIOType;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedTable.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedTable.java
index 426789c..d661866 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedTable.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedTable.java
@@ -19,8 +19,8 @@
package org.apache.beam.sdk.extensions.sql.mock;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
+import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PCollection;
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedUnboundedTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedUnboundedTable.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedUnboundedTable.java
index 465705d..31234e1 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedUnboundedTable.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedUnboundedTable.java
@@ -22,9 +22,9 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
import org.apache.beam.sdk.extensions.sql.TestUtils;
-import org.apache.beam.sdk.extensions.sql.schema.BeamIOType;
-import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
+import org.apache.beam.sdk.extensions.sql.impl.schema.BeamIOType;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PCollection;