You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/08/04 17:09:46 UTC
[3/7] beam git commit: refactor BeamRecord, BeamRecordType,
BeamSqlRecordType, BeamRecordCoder
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
index 46cab09..e3c6aec 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
@@ -20,11 +20,11 @@ package org.apache.beam.sdk.extensions.sql;
import java.sql.Types;
import java.util.Arrays;
import java.util.Iterator;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdaf;
import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdf;
import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
@@ -39,24 +39,24 @@ public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase {
*/
@Test
public void testUdaf() throws Exception {
- BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int2", "squaresum"),
+ BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int2", "squaresum"),
Arrays.asList(Types.INTEGER, Types.INTEGER));
- BeamSqlRow record = new BeamSqlRow(resultType);
+ BeamRecord record = new BeamRecord(resultType);
record.addField("f_int2", 0);
record.addField("squaresum", 30);
String sql1 = "SELECT f_int2, squaresum1(f_int) AS `squaresum`"
+ " FROM PCOLLECTION GROUP BY f_int2";
- PCollection<BeamSqlRow> result1 =
+ PCollection<BeamRecord> result1 =
boundedInput1.apply("testUdaf1",
BeamSql.simpleQuery(sql1).withUdaf("squaresum1", SquareSum.class));
PAssert.that(result1).containsInAnyOrder(record);
String sql2 = "SELECT f_int2, squaresum2(f_int) AS `squaresum`"
+ " FROM PCOLLECTION GROUP BY f_int2";
- PCollection<BeamSqlRow> result2 =
- PCollectionTuple.of(new TupleTag<BeamSqlRow>("PCOLLECTION"), boundedInput1)
+ PCollection<BeamRecord> result2 =
+ PCollectionTuple.of(new TupleTag<BeamRecord>("PCOLLECTION"), boundedInput1)
.apply("testUdaf2",
BeamSql.query(sql2).withUdaf("squaresum2", SquareSum.class));
PAssert.that(result2).containsInAnyOrder(record);
@@ -69,22 +69,22 @@ public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase {
*/
@Test
public void testUdf() throws Exception{
- BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", "cubicvalue"),
+ BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "cubicvalue"),
Arrays.asList(Types.INTEGER, Types.INTEGER));
- BeamSqlRow record = new BeamSqlRow(resultType);
+ BeamRecord record = new BeamRecord(resultType);
record.addField("f_int", 2);
record.addField("cubicvalue", 8);
String sql1 = "SELECT f_int, cubic1(f_int) as cubicvalue FROM PCOLLECTION WHERE f_int = 2";
- PCollection<BeamSqlRow> result1 =
+ PCollection<BeamRecord> result1 =
boundedInput1.apply("testUdf1",
BeamSql.simpleQuery(sql1).withUdf("cubic1", CubicInteger.class));
PAssert.that(result1).containsInAnyOrder(record);
String sql2 = "SELECT f_int, cubic2(f_int) as cubicvalue FROM PCOLLECTION WHERE f_int = 2";
- PCollection<BeamSqlRow> result2 =
- PCollectionTuple.of(new TupleTag<BeamSqlRow>("PCOLLECTION"), boundedInput1)
+ PCollection<BeamRecord> result2 =
+ PCollectionTuple.of(new TupleTag<BeamRecord>("PCOLLECTION"), boundedInput1)
.apply("testUdf2",
BeamSql.query(sql2).withUdf("cubic2", CubicInteger.class));
PAssert.that(result2).containsInAnyOrder(record);
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
index 9995b0a..63b6ca8 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
@@ -21,9 +21,9 @@ package org.apache.beam.sdk.extensions.sql;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.values.BeamRecord;
/**
* Test utilities.
@@ -32,7 +32,7 @@ public class TestUtils {
/**
* A {@code DoFn} to convert a {@code BeamSqlRow} to a comparable {@code String}.
*/
- public static class BeamSqlRow2StringDoFn extends DoFn<BeamSqlRow, String> {
+ public static class BeamSqlRow2StringDoFn extends DoFn<BeamRecord, String> {
@ProcessElement
public void processElement(ProcessContext ctx) {
ctx.output(ctx.element().valueInString());
@@ -42,9 +42,9 @@ public class TestUtils {
/**
* Convert list of {@code BeamSqlRow} to list of {@code String}.
*/
- public static List<String> beamSqlRows2Strings(List<BeamSqlRow> rows) {
+ public static List<String> beamSqlRows2Strings(List<BeamRecord> rows) {
List<String> strs = new ArrayList<>();
- for (BeamSqlRow row : rows) {
+ for (BeamRecord row : rows) {
strs.add(row.valueInString());
}
@@ -69,8 +69,8 @@ public class TestUtils {
* {@code}
*/
public static class RowsBuilder {
- private BeamSqlRowType type;
- private List<BeamSqlRow> rows = new ArrayList<>();
+ private BeamSqlRecordType type;
+ private List<BeamRecord> rows = new ArrayList<>();
/**
* Create a RowsBuilder with the specified row type info.
@@ -86,7 +86,7 @@ public class TestUtils {
* @args pairs of column type and column names.
*/
public static RowsBuilder of(final Object... args) {
- BeamSqlRowType beamSQLRowType = buildBeamSqlRowType(args);
+ BeamSqlRecordType beamSQLRowType = buildBeamSqlRowType(args);
RowsBuilder builder = new RowsBuilder();
builder.type = beamSQLRowType;
@@ -103,7 +103,7 @@ public class TestUtils {
* )}</pre>
* @beamSQLRowType the record type.
*/
- public static RowsBuilder of(final BeamSqlRowType beamSQLRowType) {
+ public static RowsBuilder of(final BeamSqlRecordType beamSQLRowType) {
RowsBuilder builder = new RowsBuilder();
builder.type = beamSQLRowType;
@@ -130,7 +130,7 @@ public class TestUtils {
return this;
}
- public List<BeamSqlRow> getRows() {
+ public List<BeamRecord> getRows() {
return rows;
}
@@ -153,7 +153,7 @@ public class TestUtils {
* )
* }</pre>
*/
- public static BeamSqlRowType buildBeamSqlRowType(Object... args) {
+ public static BeamSqlRecordType buildBeamSqlRowType(Object... args) {
List<Integer> types = new ArrayList<>();
List<String> names = new ArrayList<>();
@@ -162,7 +162,7 @@ public class TestUtils {
names.add((String) args[i + 1]);
}
- return BeamSqlRowType.create(names, types);
+ return BeamSqlRecordType.create(names, types);
}
/**
@@ -179,12 +179,12 @@ public class TestUtils {
* )
* }</pre>
*/
- public static List<BeamSqlRow> buildRows(BeamSqlRowType type, List args) {
- List<BeamSqlRow> rows = new ArrayList<>();
+ public static List<BeamRecord> buildRows(BeamSqlRecordType type, List args) {
+ List<BeamRecord> rows = new ArrayList<>();
int fieldCount = type.size();
for (int i = 0; i < args.size(); i += fieldCount) {
- BeamSqlRow row = new BeamSqlRow(type);
+ BeamRecord row = new BeamRecord(type);
for (int j = 0; j < fieldCount; j++) {
row.addField(j, args.get(i + j));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java
index 388c556..4da7790 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java
@@ -24,8 +24,8 @@ import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRelDataTypeSystem;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRuleSets;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.config.Lex;
import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
@@ -57,8 +57,8 @@ public class BeamSqlFnExecutorTestBase {
RelDataTypeSystem.DEFAULT);
public static RelDataType relDataType;
- public static BeamSqlRowType beamRowType;
- public static BeamSqlRow record;
+ public static BeamSqlRecordType beamRowType;
+ public static BeamRecord record;
public static RelBuilder relBuilder;
@@ -71,7 +71,7 @@ public class BeamSqlFnExecutorTestBase {
.add("order_time", SqlTypeName.BIGINT).build();
beamRowType = CalciteUtils.toBeamRowType(relDataType);
- record = new BeamSqlRow(beamRowType);
+ record = new BeamRecord(beamRowType);
record.addField(0, 1234567L);
record.addField(1, 0);
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRelTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRelTest.java
index 5a3f65d..a51cc30 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRelTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRelTest.java
@@ -23,9 +23,9 @@ import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.TestUtils;
import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PCollection;
import org.junit.BeforeClass;
import org.junit.Rule;
@@ -77,7 +77,7 @@ public class BeamIntersectRelTest {
+ "SELECT order_id, site_id, price "
+ "FROM ORDER_DETAILS2 ";
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+ PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
PAssert.that(rows).containsInAnyOrder(
TestUtils.RowsBuilder.of(
Types.BIGINT, "order_id",
@@ -100,7 +100,7 @@ public class BeamIntersectRelTest {
+ "SELECT order_id, site_id, price "
+ "FROM ORDER_DETAILS2 ";
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+ PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
PAssert.that(rows).satisfies(new CheckSize(3));
PAssert.that(rows).containsInAnyOrder(
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java
index c4f6350..dde1540 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java
@@ -23,9 +23,9 @@ import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.TestUtils;
import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PCollection;
import org.junit.BeforeClass;
import org.junit.Rule;
@@ -77,7 +77,7 @@ public class BeamJoinRelBoundedVsBoundedTest {
+ " o1.order_id=o2.site_id AND o2.price=o1.site_id"
;
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+ PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
PAssert.that(rows).containsInAnyOrder(
TestUtils.RowsBuilder.of(
Types.INTEGER, "order_id",
@@ -102,7 +102,7 @@ public class BeamJoinRelBoundedVsBoundedTest {
+ " o1.order_id=o2.site_id AND o2.price=o1.site_id"
;
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+ PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
pipeline.enableAbandonedNodeEnforcement(false);
PAssert.that(rows).containsInAnyOrder(
TestUtils.RowsBuilder.of(
@@ -130,7 +130,7 @@ public class BeamJoinRelBoundedVsBoundedTest {
+ " o1.order_id=o2.site_id AND o2.price=o1.site_id"
;
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+ PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
PAssert.that(rows).containsInAnyOrder(
TestUtils.RowsBuilder.of(
Types.INTEGER, "order_id",
@@ -157,7 +157,7 @@ public class BeamJoinRelBoundedVsBoundedTest {
+ " o1.order_id=o2.site_id AND o2.price=o1.site_id"
;
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+ PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
PAssert.that(rows).containsInAnyOrder(
TestUtils.RowsBuilder.of(
Types.INTEGER, "order_id",
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
index 1dbd8b4..28ad99c 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
@@ -26,10 +26,10 @@ import org.apache.beam.sdk.extensions.sql.TestUtils;
import org.apache.beam.sdk.extensions.sql.impl.transform.BeamSqlOutputToConsoleFn;
import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
import org.apache.beam.sdk.extensions.sql.mock.MockedUnboundedTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
import org.junit.BeforeClass;
@@ -98,7 +98,7 @@ public class BeamJoinRelUnboundedVsBoundedTest {
+ " o1.order_id=o2.order_id"
;
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+ PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
.containsInAnyOrder(
TestUtils.RowsBuilder.of(
@@ -124,7 +124,7 @@ public class BeamJoinRelUnboundedVsBoundedTest {
+ " o1.order_id=o2.order_id"
;
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+ PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
.containsInAnyOrder(
TestUtils.RowsBuilder.of(
@@ -150,7 +150,7 @@ public class BeamJoinRelUnboundedVsBoundedTest {
+ " o1.order_id=o2.order_id"
;
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+ PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
rows.apply(ParDo.of(new BeamSqlOutputToConsoleFn("helloworld")));
PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
.containsInAnyOrder(
@@ -192,7 +192,7 @@ public class BeamJoinRelUnboundedVsBoundedTest {
+ " on "
+ " o1.order_id=o2.order_id"
;
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+ PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
.containsInAnyOrder(
TestUtils.RowsBuilder.of(
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java
index 5e5e416..a5a2e85 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java
@@ -25,10 +25,10 @@ import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.TestUtils;
import org.apache.beam.sdk.extensions.sql.impl.transform.BeamSqlOutputToConsoleFn;
import org.apache.beam.sdk.extensions.sql.mock.MockedUnboundedTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
import org.junit.BeforeClass;
@@ -88,7 +88,7 @@ public class BeamJoinRelUnboundedVsUnboundedTest {
+ " o1.order_id=o2.order_id"
;
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+ PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
.containsInAnyOrder(
TestUtils.RowsBuilder.of(
@@ -121,7 +121,7 @@ public class BeamJoinRelUnboundedVsUnboundedTest {
// 2, 2 | 2, 5
// 3, 3 | NULL, NULL
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+ PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
.containsInAnyOrder(
TestUtils.RowsBuilder.of(
@@ -151,7 +151,7 @@ public class BeamJoinRelUnboundedVsUnboundedTest {
+ " o1.order_id=o2.order_id"
;
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+ PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
.containsInAnyOrder(
TestUtils.RowsBuilder.of(
@@ -181,7 +181,7 @@ public class BeamJoinRelUnboundedVsUnboundedTest {
+ " o1.order_id1=o2.order_id"
;
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+ PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
rows.apply(ParDo.of(new BeamSqlOutputToConsoleFn("hello")));
PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
.containsInAnyOrder(
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java
index 9149dd4..425e554 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java
@@ -23,9 +23,9 @@ import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.TestUtils;
import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PCollection;
import org.junit.BeforeClass;
import org.junit.Rule;
@@ -78,7 +78,7 @@ public class BeamMinusRelTest {
+ "SELECT order_id, site_id, price "
+ "FROM ORDER_DETAILS2 ";
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+ PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
PAssert.that(rows).containsInAnyOrder(
TestUtils.RowsBuilder.of(
Types.BIGINT, "order_id",
@@ -100,7 +100,7 @@ public class BeamMinusRelTest {
+ "SELECT order_id, site_id, price "
+ "FROM ORDER_DETAILS2 ";
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+ PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
PAssert.that(rows).satisfies(new CheckSize(2));
PAssert.that(rows).containsInAnyOrder(
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBaseTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBaseTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBaseTest.java
index 36538c0..4de493a 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBaseTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBaseTest.java
@@ -25,11 +25,11 @@ import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.TestUtils;
import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PCollection;
import org.junit.BeforeClass;
import org.junit.Rule;
@@ -71,7 +71,7 @@ public class BeamSetOperatorRelBaseTest {
+ "FROM ORDER_DETAILS GROUP BY order_id, site_id"
+ ", TUMBLE(order_time, INTERVAL '1' HOUR) ";
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+ PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
// compare valueInString to ignore the windowStart & windowEnd
PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
.containsInAnyOrder(
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java
index 15e3b89..f033fa0 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java
@@ -24,9 +24,9 @@ import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.TestUtils;
import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PCollection;
import org.junit.Before;
import org.junit.Rule;
@@ -78,7 +78,7 @@ public class BeamSortRelTest {
+ "FROM ORDER_DETAILS "
+ "ORDER BY order_id asc, site_id desc limit 4";
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+ PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
PAssert.that(rows).containsInAnyOrder(TestUtils.RowsBuilder.of(
Types.BIGINT, "order_id",
Types.INTEGER, "site_id",
@@ -117,7 +117,7 @@ public class BeamSortRelTest {
+ "FROM ORDER_DETAILS "
+ "ORDER BY order_id asc, site_id desc NULLS FIRST limit 4";
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+ PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
PAssert.that(rows).containsInAnyOrder(
TestUtils.RowsBuilder.of(
Types.BIGINT, "order_id",
@@ -155,7 +155,7 @@ public class BeamSortRelTest {
+ "FROM ORDER_DETAILS "
+ "ORDER BY order_id asc, site_id desc NULLS LAST limit 4";
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+ PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
PAssert.that(rows).containsInAnyOrder(
TestUtils.RowsBuilder.of(
Types.BIGINT, "order_id",
@@ -178,7 +178,7 @@ public class BeamSortRelTest {
+ "FROM ORDER_DETAILS "
+ "ORDER BY order_id asc, site_id desc limit 4 offset 4";
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+ PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
PAssert.that(rows).containsInAnyOrder(
TestUtils.RowsBuilder.of(
Types.BIGINT, "order_id",
@@ -201,7 +201,7 @@ public class BeamSortRelTest {
+ "FROM ORDER_DETAILS "
+ "ORDER BY order_id asc, site_id desc limit 11";
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+ PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
PAssert.that(rows).containsInAnyOrder(
TestUtils.RowsBuilder.of(
Types.BIGINT, "order_id",
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRelTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRelTest.java
index c232b30..7cc52da 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRelTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRelTest.java
@@ -23,9 +23,9 @@ import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.TestUtils;
import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PCollection;
import org.junit.BeforeClass;
import org.junit.Rule;
@@ -63,7 +63,7 @@ public class BeamUnionRelTest {
+ " order_id, site_id, price "
+ "FROM ORDER_DETAILS ";
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+ PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
PAssert.that(rows).containsInAnyOrder(
TestUtils.RowsBuilder.of(
Types.BIGINT, "order_id",
@@ -86,7 +86,7 @@ public class BeamUnionRelTest {
+ " SELECT order_id, site_id, price "
+ "FROM ORDER_DETAILS";
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+ PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
PAssert.that(rows).containsInAnyOrder(
TestUtils.RowsBuilder.of(
Types.BIGINT, "order_id",
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRelTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRelTest.java
index e5fa864..ff31e55 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRelTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRelTest.java
@@ -23,9 +23,9 @@ import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.TestUtils;
import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PCollection;
import org.junit.BeforeClass;
import org.junit.Rule;
@@ -60,7 +60,7 @@ public class BeamValuesRelTest {
public void testValues() throws Exception {
String sql = "insert into string_table(name, description) values "
+ "('hello', 'world'), ('james', 'bond')";
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+ PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
PAssert.that(rows).containsInAnyOrder(
TestUtils.RowsBuilder.of(
Types.VARCHAR, "name",
@@ -76,7 +76,7 @@ public class BeamValuesRelTest {
@Test
public void testValues_castInt() throws Exception {
String sql = "insert into int_table (c0, c1) values(cast(1 as int), cast(2 as int))";
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+ PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
PAssert.that(rows).containsInAnyOrder(
TestUtils.RowsBuilder.of(
Types.INTEGER, "c0",
@@ -91,7 +91,7 @@ public class BeamValuesRelTest {
@Test
public void testValues_onlySelect() throws Exception {
String sql = "select 1, '1'";
- PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
+ PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
PAssert.that(rows).containsInAnyOrder(
TestUtils.RowsBuilder.of(
Types.INTEGER, "EXPR$0",
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/CheckSize.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/CheckSize.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/CheckSize.java
index 8cdf2cd..7407a76 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/CheckSize.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/CheckSize.java
@@ -18,21 +18,21 @@
package org.apache.beam.sdk.extensions.sql.impl.rel;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.BeamRecord;
import org.junit.Assert;
/**
* Utility class to check size of BeamSQLRow iterable.
*/
-public class CheckSize implements SerializableFunction<Iterable<BeamSqlRow>, Void> {
+public class CheckSize implements SerializableFunction<Iterable<BeamRecord>, Void> {
private int size;
public CheckSize(int size) {
this.size = size;
}
- @Override public Void apply(Iterable<BeamSqlRow> input) {
+ @Override public Void apply(Iterable<BeamRecord> input) {
int count = 0;
- for (BeamSqlRow row : input) {
+ for (BeamRecord row : input) {
count++;
}
Assert.assertEquals(size, count);
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
index ffc6833..b58a17f 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
@@ -32,11 +32,10 @@ import java.util.TimeZone;
import org.apache.beam.sdk.extensions.sql.BeamSql;
import org.apache.beam.sdk.extensions.sql.TestUtils;
import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PCollection;
import org.apache.calcite.util.Pair;
import org.junit.Rule;
@@ -62,8 +61,8 @@ public class BeamSqlBuiltinFunctionsIntegrationTestBase {
@Rule
public final TestPipeline pipeline = TestPipeline.create();
- protected PCollection<BeamSqlRow> getTestPCollection() {
- BeamSqlRowType type = BeamSqlRowType.create(
+ protected PCollection<BeamRecord> getTestPCollection() {
+ BeamSqlRecordType type = BeamSqlRecordType.create(
Arrays.asList("ts", "c_tinyint", "c_smallint",
"c_integer", "c_bigint", "c_float", "c_double", "c_decimal",
"c_tinyint_max", "c_smallint_max", "c_integer_max", "c_bigint_max"),
@@ -89,7 +88,7 @@ public class BeamSqlBuiltinFunctionsIntegrationTestBase {
9223372036854775807L
)
.buildIOReader(pipeline)
- .setCoder(new BeamSqlRowCoder(type));
+ .setCoder(type.getRecordCoder());
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -140,7 +139,7 @@ public class BeamSqlBuiltinFunctionsIntegrationTestBase {
* Build the corresponding SQL, compile to Beam Pipeline, run it, and check the result.
*/
public void buildRunAndCheck() {
- PCollection<BeamSqlRow> inputCollection = getTestPCollection();
+ PCollection<BeamRecord> inputCollection = getTestPCollection();
System.out.println("SQL:>\n" + getSql());
try {
List<String> names = new ArrayList<>();
@@ -153,10 +152,10 @@ public class BeamSqlBuiltinFunctionsIntegrationTestBase {
values.add(pair.getValue());
}
- PCollection<BeamSqlRow> rows = inputCollection.apply(BeamSql.simpleQuery(getSql()));
+ PCollection<BeamRecord> rows = inputCollection.apply(BeamSql.simpleQuery(getSql()));
PAssert.that(rows).containsInAnyOrder(
TestUtils.RowsBuilder
- .of(BeamSqlRowType.create(names, types))
+ .of(BeamSqlRecordType.create(names, types))
.addRows(values)
.getRows()
);
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java
index 14de5b6..3569e31 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java
@@ -22,9 +22,8 @@ import java.math.BigDecimal;
import java.sql.Types;
import java.util.Arrays;
import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PCollection;
import org.junit.Test;
@@ -282,8 +281,8 @@ public class BeamSqlComparisonOperatorsIntegrationTest
checker.buildRunAndCheck();
}
- @Override protected PCollection<BeamSqlRow> getTestPCollection() {
- BeamSqlRowType type = BeamSqlRowType.create(
+ @Override protected PCollection<BeamRecord> getTestPCollection() {
+ BeamSqlRecordType type = BeamSqlRecordType.create(
Arrays.asList(
"c_tinyint_0", "c_tinyint_1", "c_tinyint_2",
"c_smallint_0", "c_smallint_1", "c_smallint_2",
@@ -322,7 +321,7 @@ public class BeamSqlComparisonOperatorsIntegrationTest
false, true
)
.buildIOReader(pipeline)
- .setCoder(new BeamSqlRowCoder(type));
+ .setCoder(type.getRecordCoder());
} catch (Exception e) {
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java
index 181c991..cda6a3c 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java
@@ -24,9 +24,9 @@ import static org.junit.Assert.assertTrue;
import java.util.Date;
import java.util.Iterator;
import org.apache.beam.sdk.extensions.sql.BeamSql;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PCollection;
import org.junit.Test;
@@ -63,17 +63,17 @@ public class BeamSqlDateFunctionsIntegrationTest
+ "CURRENT_TIMESTAMP as c3"
+ " FROM PCOLLECTION"
;
- PCollection<BeamSqlRow> rows = getTestPCollection().apply(
+ PCollection<BeamRecord> rows = getTestPCollection().apply(
BeamSql.simpleQuery(sql));
PAssert.that(rows).satisfies(new Checker());
pipeline.run();
}
- private static class Checker implements SerializableFunction<Iterable<BeamSqlRow>, Void> {
- @Override public Void apply(Iterable<BeamSqlRow> input) {
- Iterator<BeamSqlRow> iter = input.iterator();
+ private static class Checker implements SerializableFunction<Iterable<BeamRecord>, Void> {
+ @Override public Void apply(Iterable<BeamRecord> input) {
+ Iterator<BeamRecord> iter = input.iterator();
assertTrue(iter.hasNext());
- BeamSqlRow row = iter.next();
+ BeamRecord row = iter.next();
// LOCALTIME
Date date = new Date();
assertTrue(date.getTime() - row.getGregorianCalendar(0).getTime().getTime() < 1000);
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java
index c7c26eb..073ca52 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java
@@ -26,12 +26,12 @@ import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.sql.schema.BeamIOType;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
@@ -41,11 +41,11 @@ import org.apache.beam.sdk.values.PDone;
*/
public class MockedBoundedTable extends MockedTable {
/** rows written to this table. */
- private static final ConcurrentLinkedQueue<BeamSqlRow> CONTENT = new ConcurrentLinkedQueue<>();
+ private static final ConcurrentLinkedQueue<BeamRecord> CONTENT = new ConcurrentLinkedQueue<>();
/** rows flow out from this table. */
- private final List<BeamSqlRow> rows = new ArrayList<>();
+ private final List<BeamRecord> rows = new ArrayList<>();
- public MockedBoundedTable(BeamSqlRowType beamSqlRowType) {
+ public MockedBoundedTable(BeamSqlRecordType beamSqlRowType) {
super(beamSqlRowType);
}
@@ -69,7 +69,7 @@ public class MockedBoundedTable extends MockedTable {
/**
* Build a mocked bounded table with the specified type.
*/
- public static MockedBoundedTable of(final BeamSqlRowType type) {
+ public static MockedBoundedTable of(final BeamSqlRecordType type) {
return new MockedBoundedTable(type);
}
@@ -88,7 +88,7 @@ public class MockedBoundedTable extends MockedTable {
* }</pre>
*/
public MockedBoundedTable addRows(Object... args) {
- List<BeamSqlRow> rows = buildRows(getRowType(), Arrays.asList(args));
+ List<BeamRecord> rows = buildRows(getRowType(), Arrays.asList(args));
this.rows.addAll(rows);
return this;
}
@@ -99,12 +99,12 @@ public class MockedBoundedTable extends MockedTable {
}
@Override
- public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) {
+ public PCollection<BeamRecord> buildIOReader(Pipeline pipeline) {
return PBegin.in(pipeline).apply(
"MockedBoundedTable_Reader_" + COUNTER.incrementAndGet(), Create.of(rows));
}
- @Override public PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter() {
+ @Override public PTransform<? super PCollection<BeamRecord>, PDone> buildIOWriter() {
return new OutputStore();
}
@@ -112,11 +112,11 @@ public class MockedBoundedTable extends MockedTable {
* Keep output in {@code CONTENT} for validation.
*
*/
- public static class OutputStore extends PTransform<PCollection<BeamSqlRow>, PDone> {
+ public static class OutputStore extends PTransform<PCollection<BeamRecord>, PDone> {
@Override
- public PDone expand(PCollection<BeamSqlRow> input) {
- input.apply(ParDo.of(new DoFn<BeamSqlRow, Void>() {
+ public PDone expand(PCollection<BeamRecord> input) {
+ input.apply(ParDo.of(new DoFn<BeamRecord, Void>() {
@ProcessElement
public void processElement(ProcessContext c) {
CONTENT.add(c.element());
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedTable.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedTable.java
index 6017ee7..59fc6e1 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedTable.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedTable.java
@@ -20,9 +20,9 @@ package org.apache.beam.sdk.extensions.sql.mock;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
@@ -31,12 +31,12 @@ import org.apache.beam.sdk.values.PDone;
*/
public abstract class MockedTable extends BaseBeamTable {
public static final AtomicInteger COUNTER = new AtomicInteger();
- public MockedTable(BeamSqlRowType beamSqlRowType) {
+ public MockedTable(BeamSqlRecordType beamSqlRowType) {
super(beamSqlRowType);
}
@Override
- public PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter() {
+ public PTransform<? super PCollection<BeamRecord>, PDone> buildIOWriter() {
throw new UnsupportedOperationException("buildIOWriter unsupported!");
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedUnboundedTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedUnboundedTable.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedUnboundedTable.java
index f9ea2ac..6194264 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedUnboundedTable.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedUnboundedTable.java
@@ -24,10 +24,9 @@ import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.sql.TestUtils;
import org.apache.beam.sdk.extensions.sql.schema.BeamIOType;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.calcite.util.Pair;
@@ -39,10 +38,10 @@ import org.joda.time.Instant;
*/
public class MockedUnboundedTable extends MockedTable {
/** rows flow out from this table with the specified watermark instant. */
- private final List<Pair<Duration, List<BeamSqlRow>>> timestampedRows = new ArrayList<>();
+ private final List<Pair<Duration, List<BeamRecord>>> timestampedRows = new ArrayList<>();
/** specify the index of column in the row which stands for the event time field. */
private int timestampField;
- private MockedUnboundedTable(BeamSqlRowType beamSqlRowType) {
+ private MockedUnboundedTable(BeamSqlRecordType beamSqlRowType) {
super(beamSqlRowType);
}
@@ -83,7 +82,7 @@ public class MockedUnboundedTable extends MockedTable {
* }</pre>
*/
public MockedUnboundedTable addRows(Duration duration, Object... args) {
- List<BeamSqlRow> rows = TestUtils.buildRows(getRowType(), Arrays.asList(args));
+ List<BeamRecord> rows = TestUtils.buildRows(getRowType(), Arrays.asList(args));
// record the watermark + rows
this.timestampedRows.add(Pair.of(duration, rows));
return this;
@@ -93,11 +92,10 @@ public class MockedUnboundedTable extends MockedTable {
return BeamIOType.UNBOUNDED;
}
- @Override public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) {
- TestStream.Builder<BeamSqlRow> values = TestStream.create(
- new BeamSqlRowCoder(beamSqlRowType));
+ @Override public PCollection<BeamRecord> buildIOReader(Pipeline pipeline) {
+ TestStream.Builder<BeamRecord> values = TestStream.create(beamSqlRowType.getRecordCoder());
- for (Pair<Duration, List<BeamSqlRow>> pair : timestampedRows) {
+ for (Pair<Duration, List<BeamRecord>> pair : timestampedRows) {
values = values.advanceWatermarkTo(new Instant(0).plus(pair.getKey()));
for (int i = 0; i < pair.getValue().size(); i++) {
values = values.addElements(TimestampedValue.of(pair.getValue().get(i),
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoderTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoderTest.java
index ddff819..08f98c3 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoderTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoderTest.java
@@ -21,8 +21,10 @@ package org.apache.beam.sdk.extensions.sql.schema;
import java.math.BigDecimal;
import java.util.Date;
import java.util.GregorianCalendar;
+import org.apache.beam.sdk.coders.BeamRecordCoder;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
@@ -57,10 +59,10 @@ public class BeamSqlRowCoderTest {
}
};
- BeamSqlRowType beamSQLRowType = CalciteUtils.toBeamRowType(
+ BeamSqlRecordType beamSQLRowType = CalciteUtils.toBeamRowType(
protoRowType.apply(new JavaTypeFactoryImpl(
RelDataTypeSystem.DEFAULT)));
- BeamSqlRow row = new BeamSqlRow(beamSQLRowType);
+ BeamRecord row = new BeamRecord(beamSQLRowType);
row.addField("col_tinyint", Byte.valueOf("1"));
row.addField("col_smallint", Short.valueOf("1"));
row.addField("col_integer", 1);
@@ -76,7 +78,7 @@ public class BeamSqlRowCoderTest {
row.addField("col_boolean", true);
- BeamSqlRowCoder coder = new BeamSqlRowCoder(beamSQLRowType);
+ BeamRecordCoder coder = beamSQLRowType.getRecordCoder();
CoderProperties.coderDecodeEncodeEqual(coder, row);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTableTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTableTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTableTest.java
index 05af36c..2fc013d 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTableTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTableTest.java
@@ -21,13 +21,13 @@ package org.apache.beam.sdk.extensions.sql.schema.kafka;
import java.io.Serializable;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.calcite.rel.type.RelDataType;
@@ -45,8 +45,8 @@ import org.junit.Test;
public class BeamKafkaCSVTableTest {
@Rule
public TestPipeline pipeline = TestPipeline.create();
- public static BeamSqlRow row1 = new BeamSqlRow(genRowType());
- public static BeamSqlRow row2 = new BeamSqlRow(genRowType());
+ public static BeamRecord row1 = new BeamRecord(genRowType());
+ public static BeamRecord row2 = new BeamRecord(genRowType());
@BeforeClass
public static void setUp() {
@@ -60,7 +60,7 @@ public class BeamKafkaCSVTableTest {
}
@Test public void testCsvRecorderDecoder() throws Exception {
- PCollection<BeamSqlRow> result = pipeline
+ PCollection<BeamRecord> result = pipeline
.apply(
Create.of("1,\"1\",1.0", "2,2,2.0")
)
@@ -75,7 +75,7 @@ public class BeamKafkaCSVTableTest {
}
@Test public void testCsvRecorderEncoder() throws Exception {
- PCollection<BeamSqlRow> result = pipeline
+ PCollection<BeamRecord> result = pipeline
.apply(
Create.of(row1, row2)
)
@@ -90,7 +90,7 @@ public class BeamKafkaCSVTableTest {
pipeline.run();
}
- private static BeamSqlRowType genRowType() {
+ private static BeamSqlRecordType genRowType() {
return CalciteUtils.toBeamRowType(new RelProtoDataType() {
@Override public RelDataType apply(RelDataTypeFactory a0) {
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableTest.java
index 79e3d6d..4a39f7c 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableTest.java
@@ -33,10 +33,10 @@ import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PCollection;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
@@ -69,7 +69,7 @@ public class BeamTextCSVTableTest {
private static Object[] data2 = new Object[] { 2, 2L, 2.2F, 2.2, "bond" };
private static List<Object[]> testData = Arrays.asList(data1, data2);
- private static List<BeamSqlRow> testDataRows = new ArrayList<BeamSqlRow>() {{
+ private static List<BeamRecord> testDataRows = new ArrayList<BeamRecord>() {{
for (Object[] data : testData) {
add(buildRow(data));
}
@@ -80,7 +80,7 @@ public class BeamTextCSVTableTest {
private static File writerTargetFile;
@Test public void testBuildIOReader() {
- PCollection<BeamSqlRow> rows = new BeamTextCSVTable(buildBeamSqlRowType(),
+ PCollection<BeamRecord> rows = new BeamTextCSVTable(buildBeamSqlRowType(),
readerSourceFile.getAbsolutePath()).buildIOReader(pipeline);
PAssert.that(rows).containsInAnyOrder(testDataRows);
pipeline.run();
@@ -93,7 +93,7 @@ public class BeamTextCSVTableTest {
.buildIOWriter());
pipeline.run();
- PCollection<BeamSqlRow> rows = new BeamTextCSVTable(buildBeamSqlRowType(),
+ PCollection<BeamRecord> rows = new BeamTextCSVTable(buildBeamSqlRowType(),
writerTargetFile.getAbsolutePath()).buildIOReader(pipeline2);
// confirm the two reads match
@@ -166,11 +166,11 @@ public class BeamTextCSVTableTest {
.add("amount", SqlTypeName.DOUBLE).add("user_name", SqlTypeName.VARCHAR).build();
}
- private static BeamSqlRowType buildBeamSqlRowType() {
+ private static BeamSqlRecordType buildBeamSqlRowType() {
return CalciteUtils.toBeamRowType(buildRelDataType());
}
- private static BeamSqlRow buildRow(Object[] data) {
- return new BeamSqlRow(buildBeamSqlRowType(), Arrays.asList(data));
+ private static BeamRecord buildRow(Object[] data) {
+ return new BeamRecord(buildBeamSqlRowType(), Arrays.asList(data));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/transform/BeamAggregationTransformTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/transform/BeamAggregationTransformTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/transform/BeamAggregationTransformTest.java
index 821abc9..dca2ad7 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/transform/BeamAggregationTransformTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/transform/BeamAggregationTransformTest.java
@@ -21,14 +21,13 @@ import java.text.ParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import org.apache.beam.sdk.coders.BeamRecordCoder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner;
import org.apache.beam.sdk.extensions.sql.impl.transform.BeamAggregationTransforms;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Combine;
@@ -36,6 +35,7 @@ import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.calcite.rel.core.AggregateCall;
@@ -63,14 +63,14 @@ public class BeamAggregationTransformTest extends BeamTransformBaseTest{
private List<AggregateCall> aggCalls;
- private BeamSqlRowType keyType;
- private BeamSqlRowType aggPartType;
- private BeamSqlRowType outputType;
+ private BeamSqlRecordType keyType;
+ private BeamSqlRecordType aggPartType;
+ private BeamSqlRecordType outputType;
- private BeamSqlRowCoder inRecordCoder;
- private BeamSqlRowCoder keyCoder;
- private BeamSqlRowCoder aggCoder;
- private BeamSqlRowCoder outRecordCoder;
+ private BeamRecordCoder inRecordCoder;
+ private BeamRecordCoder keyCoder;
+ private BeamRecordCoder aggCoder;
+ private BeamRecordCoder outRecordCoder;
/**
* This step equals to below query.
@@ -99,28 +99,28 @@ public class BeamAggregationTransformTest extends BeamTransformBaseTest{
public void testCountPerElementBasic() throws ParseException {
setupEnvironment();
- PCollection<BeamSqlRow> input = p.apply(Create.of(inputRows));
+ PCollection<BeamRecord> input = p.apply(Create.of(inputRows));
//1. extract fields in group-by key part
- PCollection<KV<BeamSqlRow, BeamSqlRow>> exGroupByStream = input.apply("exGroupBy",
+ PCollection<KV<BeamRecord, BeamRecord>> exGroupByStream = input.apply("exGroupBy",
WithKeys
.of(new BeamAggregationTransforms.AggregationGroupByKeyFn(-1, ImmutableBitSet.of(0))))
- .setCoder(KvCoder.<BeamSqlRow, BeamSqlRow>of(keyCoder, inRecordCoder));
+ .setCoder(KvCoder.<BeamRecord, BeamRecord>of(keyCoder, inRecordCoder));
//2. apply a GroupByKey.
- PCollection<KV<BeamSqlRow, Iterable<BeamSqlRow>>> groupedStream = exGroupByStream
- .apply("groupBy", GroupByKey.<BeamSqlRow, BeamSqlRow>create())
- .setCoder(KvCoder.<BeamSqlRow, Iterable<BeamSqlRow>>of(keyCoder,
- IterableCoder.<BeamSqlRow>of(inRecordCoder)));
+ PCollection<KV<BeamRecord, Iterable<BeamRecord>>> groupedStream = exGroupByStream
+ .apply("groupBy", GroupByKey.<BeamRecord, BeamRecord>create())
+ .setCoder(KvCoder.<BeamRecord, Iterable<BeamRecord>>of(keyCoder,
+ IterableCoder.<BeamRecord>of(inRecordCoder)));
//3. run aggregation functions
- PCollection<KV<BeamSqlRow, BeamSqlRow>> aggregatedStream = groupedStream.apply("aggregation",
- Combine.<BeamSqlRow, BeamSqlRow, BeamSqlRow>groupedValues(
+ PCollection<KV<BeamRecord, BeamRecord>> aggregatedStream = groupedStream.apply("aggregation",
+ Combine.<BeamRecord, BeamRecord, BeamRecord>groupedValues(
new BeamAggregationTransforms.AggregationAdaptor(aggCalls, inputRowType)))
- .setCoder(KvCoder.<BeamSqlRow, BeamSqlRow>of(keyCoder, aggCoder));
+ .setCoder(KvCoder.<BeamRecord, BeamRecord>of(keyCoder, aggCoder));
//4. flat KV to a single record
- PCollection<BeamSqlRow> mergedStream = aggregatedStream.apply("mergeRecord",
+ PCollection<BeamRecord> mergedStream = aggregatedStream.apply("mergeRecord",
ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord(outputType, aggCalls, -1)));
mergedStream.setCoder(outRecordCoder);
@@ -332,10 +332,10 @@ public class BeamAggregationTransformTest extends BeamTransformBaseTest{
* Coders used in aggregation steps.
*/
private void prepareTypeAndCoder() {
- inRecordCoder = new BeamSqlRowCoder(inputRowType);
+ inRecordCoder = inputRowType.getRecordCoder();
keyType = initTypeOfSqlRow(Arrays.asList(KV.of("f_int", SqlTypeName.INTEGER)));
- keyCoder = new BeamSqlRowCoder(keyType);
+ keyCoder = keyType.getRecordCoder();
aggPartType = initTypeOfSqlRow(
Arrays.asList(KV.of("count", SqlTypeName.BIGINT),
@@ -360,35 +360,35 @@ public class BeamAggregationTransformTest extends BeamTransformBaseTest{
KV.of("sum8", SqlTypeName.INTEGER), KV.of("avg8", SqlTypeName.INTEGER),
KV.of("max8", SqlTypeName.INTEGER), KV.of("min8", SqlTypeName.INTEGER)
));
- aggCoder = new BeamSqlRowCoder(aggPartType);
+ aggCoder = aggPartType.getRecordCoder();
outputType = prepareFinalRowType();
- outRecordCoder = new BeamSqlRowCoder(outputType);
+ outRecordCoder = outputType.getRecordCoder();
}
/**
* expected results after {@link BeamAggregationTransforms.AggregationGroupByKeyFn}.
*/
- private List<KV<BeamSqlRow, BeamSqlRow>> prepareResultOfAggregationGroupByKeyFn() {
+ private List<KV<BeamRecord, BeamRecord>> prepareResultOfAggregationGroupByKeyFn() {
return Arrays.asList(
- KV.of(new BeamSqlRow(keyType, Arrays.<Object>asList(inputRows.get(0).getInteger(0))),
+ KV.of(new BeamRecord(keyType, Arrays.<Object>asList(inputRows.get(0).getInteger(0))),
inputRows.get(0)),
- KV.of(new BeamSqlRow(keyType, Arrays.<Object>asList(inputRows.get(1).getInteger(0))),
+ KV.of(new BeamRecord(keyType, Arrays.<Object>asList(inputRows.get(1).getInteger(0))),
inputRows.get(1)),
- KV.of(new BeamSqlRow(keyType, Arrays.<Object>asList(inputRows.get(2).getInteger(0))),
+ KV.of(new BeamRecord(keyType, Arrays.<Object>asList(inputRows.get(2).getInteger(0))),
inputRows.get(2)),
- KV.of(new BeamSqlRow(keyType, Arrays.<Object>asList(inputRows.get(3).getInteger(0))),
+ KV.of(new BeamRecord(keyType, Arrays.<Object>asList(inputRows.get(3).getInteger(0))),
inputRows.get(3)));
}
/**
* expected results after {@link BeamAggregationTransforms.AggregationCombineFn}.
*/
- private List<KV<BeamSqlRow, BeamSqlRow>> prepareResultOfAggregationCombineFn()
+ private List<KV<BeamRecord, BeamRecord>> prepareResultOfAggregationCombineFn()
throws ParseException {
return Arrays.asList(
- KV.of(new BeamSqlRow(keyType, Arrays.<Object>asList(inputRows.get(0).getInteger(0))),
- new BeamSqlRow(aggPartType, Arrays.<Object>asList(
+ KV.of(new BeamRecord(keyType, Arrays.<Object>asList(inputRows.get(0).getInteger(0))),
+ new BeamRecord(aggPartType, Arrays.<Object>asList(
4L,
10000L, 2500L, 4000L, 1000L,
(short) 10, (short) 2, (short) 4, (short) 1,
@@ -404,7 +404,7 @@ public class BeamAggregationTransformTest extends BeamTransformBaseTest{
/**
* Row type of final output row.
*/
- private BeamSqlRowType prepareFinalRowType() {
+ private BeamSqlRecordType prepareFinalRowType() {
FieldInfoBuilder builder = BeamQueryPlanner.TYPE_FACTORY.builder();
List<KV<String, SqlTypeName>> columnMetadata =
Arrays.asList(KV.of("f_int", SqlTypeName.INTEGER), KV.of("count", SqlTypeName.BIGINT),
@@ -438,8 +438,8 @@ public class BeamAggregationTransformTest extends BeamTransformBaseTest{
/**
* expected results after {@link BeamAggregationTransforms.MergeAggregationRecord}.
*/
- private BeamSqlRow prepareResultOfMergeAggregationRecord() throws ParseException {
- return new BeamSqlRow(outputType, Arrays.<Object>asList(
+ private BeamRecord prepareResultOfMergeAggregationRecord() throws ParseException {
+ return new BeamRecord(outputType, Arrays.<Object>asList(
1, 4L,
10000L, 2500L, 4000L, 1000L,
(short) 10, (short) 2, (short) 4, (short) 1,
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/transform/BeamTransformBaseTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/transform/BeamTransformBaseTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/transform/BeamTransformBaseTest.java
index af7ec23..e31463b 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/transform/BeamTransformBaseTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/transform/BeamTransformBaseTest.java
@@ -24,8 +24,8 @@ import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.KV;
import org.apache.calcite.rel.type.RelDataTypeFactory.FieldInfoBuilder;
import org.apache.calcite.sql.type.SqlTypeName;
@@ -38,8 +38,8 @@ import org.junit.BeforeClass;
public class BeamTransformBaseTest {
public static DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- public static BeamSqlRowType inputRowType;
- public static List<BeamSqlRow> inputRows;
+ public static BeamSqlRecordType inputRowType;
+ public static List<BeamRecord> inputRows;
@BeforeClass
public static void prepareInput() throws NumberFormatException, ParseException{
@@ -68,7 +68,7 @@ public class BeamTransformBaseTest {
/**
* create a {@code BeamSqlRowType} for given column metadata.
*/
- public static BeamSqlRowType initTypeOfSqlRow(List<KV<String, SqlTypeName>> columnMetadata){
+ public static BeamSqlRecordType initTypeOfSqlRow(List<KV<String, SqlTypeName>> columnMetadata){
FieldInfoBuilder builder = BeamQueryPlanner.TYPE_FACTORY.builder();
for (KV<String, SqlTypeName> cm : columnMetadata) {
builder.add(cm.getKey(), cm.getValue());
@@ -79,7 +79,7 @@ public class BeamTransformBaseTest {
/**
* Create an empty row with given column metadata.
*/
- public static BeamSqlRow initBeamSqlRow(List<KV<String, SqlTypeName>> columnMetadata) {
+ public static BeamRecord initBeamSqlRow(List<KV<String, SqlTypeName>> columnMetadata) {
return initBeamSqlRow(columnMetadata, Arrays.asList());
}
@@ -87,11 +87,11 @@ public class BeamTransformBaseTest {
* Create a row with given column metadata, and values for each column.
*
*/
- public static BeamSqlRow initBeamSqlRow(List<KV<String, SqlTypeName>> columnMetadata,
+ public static BeamRecord initBeamSqlRow(List<KV<String, SqlTypeName>> columnMetadata,
List<Object> rowValues){
- BeamSqlRowType rowType = initTypeOfSqlRow(columnMetadata);
+ BeamSqlRecordType rowType = initTypeOfSqlRow(columnMetadata);
- return new BeamSqlRow(rowType, rowValues);
+ return new BeamRecord(rowType, rowValues);
}
}