You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/08/09 16:41:00 UTC
[2/3] beam git commit: [BEAM-2745] add
BeamRecordSqlType.getFieldTypeByIndex()
[BEAM-2745] add BeamRecordSqlType.getFieldTypeByIndex()
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7c76129a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7c76129a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7c76129a
Branch: refs/heads/DSL_SQL
Commit: 7c76129a02c19595b257cff6efdc1fd0e637815d
Parents: 6628674
Author: James Xu <xu...@gmail.com>
Authored: Tue Aug 8 14:53:18 2017 +0800
Committer: Tyler Akidau <ta...@apache.org>
Committed: Wed Aug 9 09:34:10 2017 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/values/BeamRecord.java | 21 ++-
.../apache/beam/sdk/values/BeamRecordType.java | 25 +--
.../apache/beam/sdk/extensions/sql/BeamSql.java | 4 +-
.../beam/sdk/extensions/sql/BeamSqlEnv.java | 6 +-
.../extensions/sql/example/BeamSqlExample.java | 4 +-
.../sql/impl/rel/BeamAggregationRel.java | 16 +-
.../extensions/sql/impl/rel/BeamJoinRel.java | 10 +-
.../extensions/sql/impl/rel/BeamValuesRel.java | 6 +-
.../transform/BeamAggregationTransforms.java | 25 +--
.../sql/impl/transform/BeamJoinTransforms.java | 22 +--
.../sql/impl/transform/BeamSqlProjectFn.java | 8 +-
.../extensions/sql/impl/utils/CalciteUtils.java | 16 +-
.../extensions/sql/schema/BaseBeamTable.java | 6 +-
.../sql/schema/BeamPCollectionTable.java | 4 +-
.../sql/schema/BeamRecordSqlType.java | 185 +++++++++++++++++++
.../sql/schema/BeamSqlRecordHelper.java | 4 +-
.../sql/schema/BeamSqlRecordType.java | 175 ------------------
.../sdk/extensions/sql/schema/BeamSqlTable.java | 2 +-
.../extensions/sql/schema/BeamTableUtils.java | 14 +-
.../sql/schema/kafka/BeamKafkaCSVTable.java | 14 +-
.../sql/schema/kafka/BeamKafkaTable.java | 6 +-
.../sql/schema/text/BeamTextCSVTable.java | 6 +-
.../schema/text/BeamTextCSVTableIOReader.java | 6 +-
.../schema/text/BeamTextCSVTableIOWriter.java | 6 +-
.../sql/schema/text/BeamTextTable.java | 4 +-
.../sql/BeamSqlDslAggregationTest.java | 14 +-
.../beam/sdk/extensions/sql/BeamSqlDslBase.java | 6 +-
.../sdk/extensions/sql/BeamSqlDslJoinTest.java | 10 +-
.../extensions/sql/BeamSqlDslProjectTest.java | 10 +-
.../extensions/sql/BeamSqlDslUdfUdafTest.java | 6 +-
.../beam/sdk/extensions/sql/TestUtils.java | 14 +-
.../interpreter/BeamSqlFnExecutorTestBase.java | 4 +-
...mSqlBuiltinFunctionsIntegrationTestBase.java | 6 +-
...amSqlComparisonOperatorsIntegrationTest.java | 4 +-
.../extensions/sql/mock/MockedBoundedTable.java | 6 +-
.../sdk/extensions/sql/mock/MockedTable.java | 4 +-
.../sql/mock/MockedUnboundedTable.java | 4 +-
.../sql/schema/BeamSqlRowCoderTest.java | 2 +-
.../sql/schema/kafka/BeamKafkaCSVTableTest.java | 4 +-
.../sql/schema/text/BeamTextCSVTableTest.java | 4 +-
.../transform/BeamAggregationTransformTest.java | 10 +-
.../schema/transform/BeamTransformBaseTest.java | 8 +-
42 files changed, 368 insertions(+), 343 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
index 6e4bd4c..a3ede3c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
@@ -37,7 +37,20 @@ public class BeamRecord implements Serializable {
private List<Object> dataValues;
private BeamRecordType dataType;
- public BeamRecord(BeamRecordType dataType, List<Object> rawdataValues) {
+ /**
+ * Creates a BeamRecord.
+ * @param dataType type of the record
+ * @param rawDataValues values of the record, record's size must match size of
+ * the {@code BeamRecordType}, or can be null, if it is null
+ * then every field is null.
+ */
+ public BeamRecord(BeamRecordType dataType, List<Object> rawDataValues) {
+ if (dataType.getFieldNames().size() != rawDataValues.size()) {
+ throw new IllegalArgumentException(
+ "Field count in BeamRecordType(" + dataType.getFieldNames().size()
+ + ") and rawDataValues(" + rawDataValues.size() + ") must match!");
+ }
+
this.dataType = dataType;
this.dataValues = new ArrayList<>(dataType.size());
@@ -46,7 +59,7 @@ public class BeamRecord implements Serializable {
}
for (int idx = 0; idx < dataType.size(); ++idx) {
- addField(idx, rawdataValues.get(idx));
+ addField(idx, rawDataValues.get(idx));
}
}
@@ -60,7 +73,7 @@ public class BeamRecord implements Serializable {
}
public Object getFieldValue(String fieldName) {
- return getFieldValue(dataType.getFieldsName().indexOf(fieldName));
+ return getFieldValue(dataType.getFieldNames().indexOf(fieldName));
}
public Byte getByte(String fieldName) {
@@ -179,7 +192,7 @@ public class BeamRecord implements Serializable {
StringBuilder sb = new StringBuilder();
for (int idx = 0; idx < size(); ++idx) {
sb.append(
- String.format(",%s=%s", getDataType().getFieldsName().get(idx), getFieldValue(idx)));
+ String.format(",%s=%s", getDataType().getFieldNames().get(idx), getFieldValue(idx)));
}
return sb.substring(1);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordType.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordType.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordType.java
index 3b20b50..6ab783c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordType.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordType.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.values;
+import com.google.common.collect.ImmutableList;
import java.io.Serializable;
import java.util.List;
import org.apache.beam.sdk.annotations.Experimental;
@@ -28,12 +29,12 @@ import org.apache.beam.sdk.coders.Coder;
*/
@Experimental
public class BeamRecordType implements Serializable{
- private List<String> fieldsName;
- private List<Coder> fieldsCoder;
+ private List<String> fieldNames;
+ private List<Coder> fieldCoders;
- public BeamRecordType(List<String> fieldsName, List<Coder> fieldsCoder) {
- this.fieldsName = fieldsName;
- this.fieldsCoder = fieldsCoder;
+ public BeamRecordType(List<String> fieldNames, List<Coder> fieldCoders) {
+ this.fieldNames = fieldNames;
+ this.fieldCoders = fieldCoders;
}
/**
@@ -49,22 +50,22 @@ public class BeamRecordType implements Serializable{
* Get the coder for {@link BeamRecordCoder}.
*/
public BeamRecordCoder getRecordCoder(){
- return BeamRecordCoder.of(this, fieldsCoder);
+ return BeamRecordCoder.of(this, fieldCoders);
}
- public List<String> getFieldsName(){
- return fieldsName;
+ public List<String> getFieldNames(){
+ return ImmutableList.copyOf(fieldNames);
}
- public String getFieldByIndex(int index){
- return fieldsName.get(index);
+ public String getFieldNameByIndex(int index){
+ return fieldNames.get(index);
}
public int findIndexOfField(String fieldName){
- return fieldsName.indexOf(fieldName);
+ return fieldNames.indexOf(fieldName);
}
public int size(){
- return fieldsName.size();
+ return fieldNames.size();
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java
index d0a6360..ac617ad 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java
@@ -22,7 +22,7 @@ import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.BeamRecordCoder;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
import org.apache.beam.sdk.extensions.sql.schema.BeamPCollectionTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdaf;
import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdf;
import org.apache.beam.sdk.transforms.PTransform;
@@ -179,7 +179,7 @@ public class BeamSql {
getSqlEnv().registerTable(sourceTag.getId(),
new BeamPCollectionTable(sourceStream,
- (BeamSqlRecordType) sourceCoder.getRecordType()));
+ (BeamRecordSqlType) sourceCoder.getRecordType()));
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java
index 3c5eb36..4d21425 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java
@@ -21,7 +21,7 @@ import java.io.Serializable;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdaf;
import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdf;
import org.apache.calcite.DataContext;
@@ -84,8 +84,8 @@ public class BeamSqlEnv implements Serializable{
}
private static class BeamCalciteTable implements ScannableTable, Serializable {
- private BeamSqlRecordType beamSqlRowType;
- public BeamCalciteTable(BeamSqlRecordType beamSqlRowType) {
+ private BeamRecordSqlType beamSqlRowType;
+ public BeamCalciteTable(BeamRecordSqlType beamSqlRowType) {
this.beamSqlRowType = beamSqlRowType;
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
index acb5943..3a46acc 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
@@ -22,7 +22,7 @@ import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.sql.BeamSql;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
@@ -53,7 +53,7 @@ class BeamSqlExample {
//define the input row format
List<String> fieldNames = Arrays.asList("c1", "c2", "c3");
List<Integer> fieldTypes = Arrays.asList(Types.INTEGER, Types.VARCHAR, Types.DOUBLE);
- BeamSqlRecordType type = BeamSqlRecordType.create(fieldNames, fieldTypes);
+ BeamRecordSqlType type = BeamRecordSqlType.create(fieldNames, fieldTypes);
BeamRecord row = new BeamRecord(type, 1, "row", 1.0);
//create a source PCollection with Create.of();
http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
index d91b484..4b557f9 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
@@ -24,7 +24,7 @@ import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.transform.BeamAggregationTransforms;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.WithKeys;
@@ -119,23 +119,23 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode {
/**
* Type of sub-rowrecord used as Group-By keys.
*/
- private BeamSqlRecordType exKeyFieldsSchema(RelDataType relDataType) {
- BeamSqlRecordType inputRowType = CalciteUtils.toBeamRowType(relDataType);
+ private BeamRecordSqlType exKeyFieldsSchema(RelDataType relDataType) {
+ BeamRecordSqlType inputRowType = CalciteUtils.toBeamRowType(relDataType);
List<String> fieldNames = new ArrayList<>();
List<Integer> fieldTypes = new ArrayList<>();
for (int i : groupSet.asList()) {
if (i != windowFieldIdx) {
- fieldNames.add(inputRowType.getFieldsName().get(i));
- fieldTypes.add(inputRowType.getFieldsType().get(i));
+ fieldNames.add(inputRowType.getFieldNameByIndex(i));
+ fieldTypes.add(inputRowType.getFieldTypeByIndex(i));
}
}
- return BeamSqlRecordType.create(fieldNames, fieldTypes);
+ return BeamRecordSqlType.create(fieldNames, fieldTypes);
}
/**
* Type of sub-rowrecord, that represents the list of aggregation fields.
*/
- private BeamSqlRecordType exAggFieldsSchema() {
+ private BeamRecordSqlType exAggFieldsSchema() {
List<String> fieldNames = new ArrayList<>();
List<Integer> fieldTypes = new ArrayList<>();
for (AggregateCall ac : getAggCallList()) {
@@ -143,7 +143,7 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode {
fieldTypes.add(CalciteUtils.toJavaType(ac.type.getSqlTypeName()));
}
- return BeamSqlRecordType.create(fieldNames, fieldTypes);
+ return BeamRecordSqlType.create(fieldNames, fieldTypes);
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
index 2bd15b3..9dceb25 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
@@ -28,7 +28,7 @@ import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.transform.BeamJoinTransforms;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
@@ -97,7 +97,7 @@ public class BeamJoinRel extends Join implements BeamRelNode {
BeamSqlEnv sqlEnv)
throws Exception {
BeamRelNode leftRelNode = BeamSqlRelUtils.getBeamRelInput(left);
- BeamSqlRecordType leftRowType = CalciteUtils.toBeamRowType(left.getRowType());
+ BeamRecordSqlType leftRowType = CalciteUtils.toBeamRowType(left.getRowType());
PCollection<BeamRecord> leftRows = leftRelNode.buildBeamPipeline(inputPCollections, sqlEnv);
final BeamRelNode rightRelNode = BeamSqlRelUtils.getBeamRelInput(right);
@@ -117,9 +117,9 @@ public class BeamJoinRel extends Join implements BeamRelNode {
List<Integer> types = new ArrayList<>(pairs.size());
for (int i = 0; i < pairs.size(); i++) {
names.add("c" + i);
- types.add(leftRowType.getFieldsType().get(pairs.get(i).getKey()));
+ types.add(leftRowType.getFieldTypeByIndex(pairs.get(i).getKey()));
}
- BeamSqlRecordType extractKeyRowType = BeamSqlRecordType.create(names, types);
+ BeamRecordSqlType extractKeyRowType = BeamRecordSqlType.create(names, types);
Coder extractKeyRowCoder = extractKeyRowType.getRecordCoder();
@@ -255,7 +255,7 @@ public class BeamJoinRel extends Join implements BeamRelNode {
}
private BeamRecord buildNullRow(BeamRelNode relNode) {
- BeamSqlRecordType leftType = CalciteUtils.toBeamRowType(relNode.getRowType());
+ BeamRecordSqlType leftType = CalciteUtils.toBeamRowType(relNode.getRowType());
return new BeamRecord(leftType, Collections.nCopies(leftType.size(), null));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
index 1d666ca..fde002e 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
@@ -23,7 +23,7 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.BeamRecord;
@@ -63,12 +63,12 @@ public class BeamValuesRel extends Values implements BeamRelNode {
throw new IllegalStateException("Values with empty tuples!");
}
- BeamSqlRecordType beamSQLRowType = CalciteUtils.toBeamRowType(this.getRowType());
+ BeamRecordSqlType beamSQLRowType = CalciteUtils.toBeamRowType(this.getRowType());
for (ImmutableList<RexLiteral> tuple : tuples) {
List<Object> fieldsValue = new ArrayList<>(beamSQLRowType.size());
for (int i = 0; i < tuple.size(); i++) {
fieldsValue.add(BeamTableUtils.autoCastField(
- beamSQLRowType.getFieldsType().get(i), tuple.get(i).getValue()));
+ beamSQLRowType.getFieldTypeByIndex(i), tuple.get(i).getValue()));
}
rows.add(new BeamRecord(beamSQLRowType, fieldsValue));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java
index c6a5d26..0f90bee 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java
@@ -35,8 +35,8 @@ import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlInputRefExpression;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdaf;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.DoFn;
@@ -59,11 +59,11 @@ public class BeamAggregationTransforms implements Serializable{
* Merge KV to single record.
*/
public static class MergeAggregationRecord extends DoFn<KV<BeamRecord, BeamRecord>, BeamRecord> {
- private BeamSqlRecordType outRowType;
+ private BeamRecordSqlType outRowType;
private List<String> aggFieldNames;
private int windowStartFieldIdx;
- public MergeAggregationRecord(BeamSqlRecordType outRowType, List<AggregateCall> aggList
+ public MergeAggregationRecord(BeamRecordSqlType outRowType, List<AggregateCall> aggList
, int windowStartFieldIdx) {
this.outRowType = outRowType;
this.aggFieldNames = new ArrayList<>();
@@ -75,10 +75,11 @@ public class BeamAggregationTransforms implements Serializable{
@ProcessElement
public void processElement(ProcessContext c, BoundedWindow window) {
- List<Object> fieldValues = new ArrayList<>();
KV<BeamRecord, BeamRecord> kvRecord = c.element();
+ List<Object> fieldValues = new ArrayList<>();
fieldValues.addAll(kvRecord.getKey().getDataValues());
fieldValues.addAll(kvRecord.getValue().getDataValues());
+
if (windowStartFieldIdx != -1) {
fieldValues.add(windowStartFieldIdx, ((IntervalWindow) window).start().toDate());
}
@@ -106,7 +107,7 @@ public class BeamAggregationTransforms implements Serializable{
@Override
public BeamRecord apply(BeamRecord input) {
- BeamSqlRecordType typeOfKey = exTypeOfKeyRecord(BeamSqlRecordHelper.getSqlRecordType(input));
+ BeamRecordSqlType typeOfKey = exTypeOfKeyRecord(BeamSqlRecordHelper.getSqlRecordType(input));
List<Object> fieldValues = new ArrayList<>(groupByKeys.size());
for (int idx = 0; idx < groupByKeys.size(); ++idx) {
@@ -117,14 +118,14 @@ public class BeamAggregationTransforms implements Serializable{
return keyOfRecord;
}
- private BeamSqlRecordType exTypeOfKeyRecord(BeamSqlRecordType dataType) {
+ private BeamRecordSqlType exTypeOfKeyRecord(BeamRecordSqlType dataType) {
List<String> fieldNames = new ArrayList<>();
List<Integer> fieldTypes = new ArrayList<>();
for (int idx : groupByKeys) {
- fieldNames.add(dataType.getFieldsName().get(idx));
- fieldTypes.add(dataType.getFieldsType().get(idx));
+ fieldNames.add(dataType.getFieldNameByIndex(idx));
+ fieldTypes.add(dataType.getFieldTypeByIndex(idx));
}
- return BeamSqlRecordType.create(fieldNames, fieldTypes);
+ return BeamRecordSqlType.create(fieldNames, fieldTypes);
}
}
@@ -152,10 +153,10 @@ public class BeamAggregationTransforms implements Serializable{
extends CombineFn<BeamRecord, AggregationAccumulator, BeamRecord> {
private List<BeamSqlUdaf> aggregators;
private List<BeamSqlExpression> sourceFieldExps;
- private BeamSqlRecordType finalRowType;
+ private BeamRecordSqlType finalRowType;
public AggregationAdaptor(List<AggregateCall> aggregationCalls,
- BeamSqlRecordType sourceRowType) {
+ BeamRecordSqlType sourceRowType) {
aggregators = new ArrayList<>();
sourceFieldExps = new ArrayList<>();
List<String> outFieldsName = new ArrayList<>();
@@ -204,7 +205,7 @@ public class BeamAggregationTransforms implements Serializable{
break;
}
}
- finalRowType = BeamSqlRecordType.create(outFieldsName, outFieldsType);
+ finalRowType = BeamRecordSqlType.create(outFieldsName, outFieldsType);
}
@Override
public AggregationAccumulator createAccumulator() {
http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
index 8f34704..9a48c53 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
@@ -22,8 +22,8 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.BeamRecord;
@@ -58,12 +58,12 @@ public class BeamJoinTransforms {
for (int i = 0; i < joinColumns.size(); i++) {
names.add("c" + i);
types.add(isLeft
- ? BeamSqlRecordHelper.getSqlRecordType(input).getFieldsType()
- .get(joinColumns.get(i).getKey())
- : BeamSqlRecordHelper.getSqlRecordType(input).getFieldsType()
- .get(joinColumns.get(i).getValue()));
+ ? BeamSqlRecordHelper.getSqlRecordType(input).getFieldTypeByIndex(
+ joinColumns.get(i).getKey())
+ : BeamSqlRecordHelper.getSqlRecordType(input).getFieldTypeByIndex(
+ joinColumns.get(i).getValue()));
}
- BeamSqlRecordType type = BeamSqlRecordType.create(names, types);
+ BeamRecordSqlType type = BeamRecordSqlType.create(names, types);
// build the row
List<Object> fieldValues = new ArrayList<>(joinColumns.size());
@@ -146,13 +146,13 @@ public class BeamJoinTransforms {
BeamRecord rightRow) {
// build the type
List<String> names = new ArrayList<>(leftRow.size() + rightRow.size());
- names.addAll(leftRow.getDataType().getFieldsName());
- names.addAll(rightRow.getDataType().getFieldsName());
+ names.addAll(leftRow.getDataType().getFieldNames());
+ names.addAll(rightRow.getDataType().getFieldNames());
List<Integer> types = new ArrayList<>(leftRow.size() + rightRow.size());
- types.addAll(BeamSqlRecordHelper.getSqlRecordType(leftRow).getFieldsType());
- types.addAll(BeamSqlRecordHelper.getSqlRecordType(rightRow).getFieldsType());
- BeamSqlRecordType type = BeamSqlRecordType.create(names, types);
+ types.addAll(BeamSqlRecordHelper.getSqlRecordType(leftRow).getFieldTypes());
+ types.addAll(BeamSqlRecordHelper.getSqlRecordType(rightRow).getFieldTypes());
+ BeamRecordSqlType type = BeamRecordSqlType.create(names, types);
List<Object> fieldValues = new ArrayList<>(leftRow.getDataValues());
fieldValues.addAll(rightRow.getDataValues());
http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java
index 34d6dbb..aac38c7 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java
@@ -21,7 +21,7 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionExecutor;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamProjectRel;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -35,10 +35,10 @@ import org.apache.beam.sdk.values.BeamRecord;
public class BeamSqlProjectFn extends DoFn<BeamRecord, BeamRecord> {
private String stepName;
private BeamSqlExpressionExecutor executor;
- private BeamSqlRecordType outputRowType;
+ private BeamRecordSqlType outputRowType;
public BeamSqlProjectFn(String stepName, BeamSqlExpressionExecutor executor,
- BeamSqlRecordType outputRowType) {
+ BeamRecordSqlType outputRowType) {
super();
this.stepName = stepName;
this.executor = executor;
@@ -57,7 +57,7 @@ public class BeamSqlProjectFn extends DoFn<BeamRecord, BeamRecord> {
List<Object> fieldsValue = new ArrayList<>(results.size());
for (int idx = 0; idx < results.size(); ++idx) {
fieldsValue.add(
- BeamTableUtils.autoCastField(outputRowType.getFieldsType().get(idx), results.get(idx)));
+ BeamTableUtils.autoCastField(outputRowType.getFieldTypeByIndex(idx), results.get(idx)));
}
BeamRecord outRow = new BeamRecord(outputRowType, fieldsValue);
http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
index bf96e85..8b6206b 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
@@ -23,7 +23,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeField;
@@ -78,33 +78,33 @@ public class CalciteUtils {
/**
* Get the {@code SqlTypeName} for the specified column of a table.
*/
- public static SqlTypeName getFieldType(BeamSqlRecordType schema, int index) {
- return toCalciteType(schema.getFieldsType().get(index));
+ public static SqlTypeName getFieldType(BeamRecordSqlType schema, int index) {
+ return toCalciteType(schema.getFieldTypeByIndex(index));
}
/**
* Generate {@code BeamSqlRowType} from {@code RelDataType} which is used to create table.
*/
- public static BeamSqlRecordType toBeamRowType(RelDataType tableInfo) {
+ public static BeamRecordSqlType toBeamRowType(RelDataType tableInfo) {
List<String> fieldNames = new ArrayList<>();
List<Integer> fieldTypes = new ArrayList<>();
for (RelDataTypeField f : tableInfo.getFieldList()) {
fieldNames.add(f.getName());
fieldTypes.add(toJavaType(f.getType().getSqlTypeName()));
}
- return BeamSqlRecordType.create(fieldNames, fieldTypes);
+ return BeamRecordSqlType.create(fieldNames, fieldTypes);
}
/**
* Create an instance of {@code RelDataType} so it can be used to create a table.
*/
- public static RelProtoDataType toCalciteRowType(final BeamSqlRecordType that) {
+ public static RelProtoDataType toCalciteRowType(final BeamRecordSqlType that) {
return new RelProtoDataType() {
@Override
public RelDataType apply(RelDataTypeFactory a) {
RelDataTypeFactory.FieldInfoBuilder builder = a.builder();
- for (int idx = 0; idx < that.getFieldsName().size(); ++idx) {
- builder.add(that.getFieldsName().get(idx), toCalciteType(that.getFieldsType().get(idx)));
+ for (int idx = 0; idx < that.getFieldNames().size(); ++idx) {
+ builder.add(that.getFieldNameByIndex(idx), toCalciteType(that.getFieldTypeByIndex(idx)));
}
return builder.build();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BaseBeamTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BaseBeamTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BaseBeamTable.java
index 68b120e..0564820 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BaseBeamTable.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BaseBeamTable.java
@@ -23,12 +23,12 @@ import java.io.Serializable;
* Each IO in Beam has one table schema, by extending {@link BaseBeamTable}.
*/
public abstract class BaseBeamTable implements BeamSqlTable, Serializable {
- protected BeamSqlRecordType beamSqlRowType;
- public BaseBeamTable(BeamSqlRecordType beamSqlRowType) {
+ protected BeamRecordSqlType beamSqlRowType;
+ public BaseBeamTable(BeamRecordSqlType beamSqlRowType) {
this.beamSqlRowType = beamSqlRowType;
}
- @Override public BeamSqlRecordType getRowType() {
+ @Override public BeamRecordSqlType getRowType() {
return beamSqlRowType;
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamPCollectionTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamPCollectionTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamPCollectionTable.java
index 68905b5..9d9988e 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamPCollectionTable.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamPCollectionTable.java
@@ -32,12 +32,12 @@ public class BeamPCollectionTable extends BaseBeamTable {
private BeamIOType ioType;
private transient PCollection<BeamRecord> upstream;
- protected BeamPCollectionTable(BeamSqlRecordType beamSqlRowType) {
+ protected BeamPCollectionTable(BeamRecordSqlType beamSqlRowType) {
super(beamSqlRowType);
}
public BeamPCollectionTable(PCollection<BeamRecord> upstream,
- BeamSqlRecordType beamSqlRowType){
+ BeamRecordSqlType beamSqlRowType){
this(beamSqlRowType);
ioType = upstream.isBounded().equals(IsBounded.BOUNDED)
? BeamIOType.BOUNDED : BeamIOType.UNBOUNDED;
http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamRecordSqlType.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamRecordSqlType.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamRecordSqlType.java
new file mode 100644
index 0000000..1845988
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamRecordSqlType.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.schema;
+
+import java.math.BigDecimal;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.GregorianCalendar;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.coders.BigDecimalCoder;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.BigEndianLongCoder;
+import org.apache.beam.sdk.coders.ByteCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.BooleanCoder;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.DateCoder;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.DoubleCoder;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.FloatCoder;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.ShortCoder;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.TimeCoder;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.beam.sdk.values.BeamRecordType;
+
+/**
+ * Type provider for {@link BeamRecord} with SQL types.
+ *
+ * <p>Limited SQL types are supported now, visit
+ * <a href="https://beam.apache.org/blog/2017/07/21/sql-dsl.html#data-type">data types</a>
+ * for more details.
+ *
+ */
+public class BeamRecordSqlType extends BeamRecordType {
+ private static final Map<Integer, Class> SQL_TYPE_TO_JAVA_CLASS = new HashMap<>();
+ static {
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.TINYINT, Byte.class);
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.SMALLINT, Short.class);
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.INTEGER, Integer.class);
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.BIGINT, Long.class);
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.FLOAT, Float.class);
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.DOUBLE, Double.class);
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.DECIMAL, BigDecimal.class);
+
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.BOOLEAN, Boolean.class);
+
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.CHAR, String.class);
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.VARCHAR, String.class);
+
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.TIME, GregorianCalendar.class);
+
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.DATE, Date.class);
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.TIMESTAMP, Date.class);
+ }
+
+ public List<Integer> fieldTypes;
+
+ protected BeamRecordSqlType(List<String> fieldsName, List<Coder> fieldsCoder) {
+ super(fieldsName, fieldsCoder);
+ }
+
+ private BeamRecordSqlType(List<String> fieldsName, List<Integer> fieldTypes
+ , List<Coder> fieldsCoder) {
+ super(fieldsName, fieldsCoder);
+ this.fieldTypes = fieldTypes;
+ }
+
+ public static BeamRecordSqlType create(List<String> fieldNames,
+ List<Integer> fieldTypes) {
+ if (fieldNames.size() != fieldTypes.size()) {
+ throw new IllegalStateException("the sizes of 'dataType' and 'fieldTypes' must match.");
+ }
+ List<Coder> fieldCoders = new ArrayList<>(fieldTypes.size());
+ for (int idx = 0; idx < fieldTypes.size(); ++idx) {
+ switch (fieldTypes.get(idx)) {
+ case Types.INTEGER:
+ fieldCoders.add(BigEndianIntegerCoder.of());
+ break;
+ case Types.SMALLINT:
+ fieldCoders.add(ShortCoder.of());
+ break;
+ case Types.TINYINT:
+ fieldCoders.add(ByteCoder.of());
+ break;
+ case Types.DOUBLE:
+ fieldCoders.add(DoubleCoder.of());
+ break;
+ case Types.FLOAT:
+ fieldCoders.add(FloatCoder.of());
+ break;
+ case Types.DECIMAL:
+ fieldCoders.add(BigDecimalCoder.of());
+ break;
+ case Types.BIGINT:
+ fieldCoders.add(BigEndianLongCoder.of());
+ break;
+ case Types.VARCHAR:
+ case Types.CHAR:
+ fieldCoders.add(StringUtf8Coder.of());
+ break;
+ case Types.TIME:
+ fieldCoders.add(TimeCoder.of());
+ break;
+ case Types.DATE:
+ case Types.TIMESTAMP:
+ fieldCoders.add(DateCoder.of());
+ break;
+ case Types.BOOLEAN:
+ fieldCoders.add(BooleanCoder.of());
+ break;
+
+ default:
+ throw new UnsupportedOperationException(
+ "Data type: " + fieldTypes.get(idx) + " not supported yet!");
+ }
+ }
+ return new BeamRecordSqlType(fieldNames, fieldTypes, fieldCoders);
+ }
+
+ @Override
+ public void validateValueType(int index, Object fieldValue) throws IllegalArgumentException {
+ if (null == fieldValue) {// no need to do type check for NULL value
+ return;
+ }
+
+ int fieldType = fieldTypes.get(index);
+ Class javaClazz = SQL_TYPE_TO_JAVA_CLASS.get(fieldType);
+ if (javaClazz == null) {
+ throw new IllegalArgumentException("Data type: " + fieldType + " not supported yet!");
+ }
+
+ if (!fieldValue.getClass().equals(javaClazz)) {
+ throw new IllegalArgumentException(
+ String.format("[%s](%s) doesn't match type [%s]",
+ fieldValue, fieldValue.getClass(), fieldType)
+ );
+ }
+ }
+
+ public List<Integer> getFieldTypes() {
+ return fieldTypes;
+ }
+
+ public Integer getFieldTypeByIndex(int index){
+ return fieldTypes.get(index);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj != null && obj instanceof BeamRecordSqlType) {
+ BeamRecordSqlType ins = (BeamRecordSqlType) obj;
+ return fieldTypes.equals(ins.getFieldTypes()) && getFieldNames().equals(ins.getFieldNames());
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return 31 * getFieldNames().hashCode() + getFieldTypes().hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return "BeamRecordSqlType [fieldNames=" + getFieldNames()
+ + ", fieldTypes=" + fieldTypes + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordHelper.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordHelper.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordHelper.java
index b910c84..89eefd1 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordHelper.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordHelper.java
@@ -39,8 +39,8 @@ import org.apache.beam.sdk.values.BeamRecord;
@Experimental
public class BeamSqlRecordHelper {
- public static BeamSqlRecordType getSqlRecordType(BeamRecord record) {
- return (BeamSqlRecordType) record.getDataType();
+ public static BeamRecordSqlType getSqlRecordType(BeamRecord record) {
+ return (BeamRecordSqlType) record.getDataType();
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordType.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordType.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordType.java
deleted file mode 100644
index b7c7438..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordType.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.schema;
-
-import java.math.BigDecimal;
-import java.sql.Types;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.GregorianCalendar;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.sdk.coders.BigDecimalCoder;
-import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
-import org.apache.beam.sdk.coders.BigEndianLongCoder;
-import org.apache.beam.sdk.coders.ByteCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.BooleanCoder;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.DateCoder;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.DoubleCoder;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.FloatCoder;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.ShortCoder;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.TimeCoder;
-import org.apache.beam.sdk.values.BeamRecord;
-import org.apache.beam.sdk.values.BeamRecordType;
-
-/**
- * Type provider for {@link BeamRecord} with SQL types.
- *
- * <p>Limited SQL types are supported now, visit
- * <a href="https://beam.apache.org/blog/2017/07/21/sql-dsl.html#data-type">data types</a>
- * for more details.
- *
- */
-public class BeamSqlRecordType extends BeamRecordType {
- private static final Map<Integer, Class> SQL_TYPE_TO_JAVA_CLASS = new HashMap<>();
- static {
- SQL_TYPE_TO_JAVA_CLASS.put(Types.TINYINT, Byte.class);
- SQL_TYPE_TO_JAVA_CLASS.put(Types.SMALLINT, Short.class);
- SQL_TYPE_TO_JAVA_CLASS.put(Types.INTEGER, Integer.class);
- SQL_TYPE_TO_JAVA_CLASS.put(Types.BIGINT, Long.class);
- SQL_TYPE_TO_JAVA_CLASS.put(Types.FLOAT, Float.class);
- SQL_TYPE_TO_JAVA_CLASS.put(Types.DOUBLE, Double.class);
- SQL_TYPE_TO_JAVA_CLASS.put(Types.DECIMAL, BigDecimal.class);
-
- SQL_TYPE_TO_JAVA_CLASS.put(Types.BOOLEAN, Boolean.class);
-
- SQL_TYPE_TO_JAVA_CLASS.put(Types.CHAR, String.class);
- SQL_TYPE_TO_JAVA_CLASS.put(Types.VARCHAR, String.class);
-
- SQL_TYPE_TO_JAVA_CLASS.put(Types.TIME, GregorianCalendar.class);
-
- SQL_TYPE_TO_JAVA_CLASS.put(Types.DATE, Date.class);
- SQL_TYPE_TO_JAVA_CLASS.put(Types.TIMESTAMP, Date.class);
- }
-
- public List<Integer> fieldsType;
-
- protected BeamSqlRecordType(List<String> fieldsName, List<Coder> fieldsCoder) {
- super(fieldsName, fieldsCoder);
- }
-
- private BeamSqlRecordType(List<String> fieldsName, List<Integer> fieldsType
- , List<Coder> fieldsCoder) {
- super(fieldsName, fieldsCoder);
- this.fieldsType = fieldsType;
- }
-
- public static BeamSqlRecordType create(List<String> fieldNames,
- List<Integer> fieldTypes) {
- if (fieldNames.size() != fieldTypes.size()) {
- throw new IllegalStateException("the sizes of 'dataType' and 'fieldTypes' must match.");
- }
- List<Coder> fieldCoders = new ArrayList<>(fieldTypes.size());
- for (int idx = 0; idx < fieldTypes.size(); ++idx) {
- switch (fieldTypes.get(idx)) {
- case Types.INTEGER:
- fieldCoders.add(BigEndianIntegerCoder.of());
- break;
- case Types.SMALLINT:
- fieldCoders.add(ShortCoder.of());
- break;
- case Types.TINYINT:
- fieldCoders.add(ByteCoder.of());
- break;
- case Types.DOUBLE:
- fieldCoders.add(DoubleCoder.of());
- break;
- case Types.FLOAT:
- fieldCoders.add(FloatCoder.of());
- break;
- case Types.DECIMAL:
- fieldCoders.add(BigDecimalCoder.of());
- break;
- case Types.BIGINT:
- fieldCoders.add(BigEndianLongCoder.of());
- break;
- case Types.VARCHAR:
- case Types.CHAR:
- fieldCoders.add(StringUtf8Coder.of());
- break;
- case Types.TIME:
- fieldCoders.add(TimeCoder.of());
- break;
- case Types.DATE:
- case Types.TIMESTAMP:
- fieldCoders.add(DateCoder.of());
- break;
- case Types.BOOLEAN:
- fieldCoders.add(BooleanCoder.of());
- break;
-
- default:
- throw new UnsupportedOperationException(
- "Data type: " + fieldTypes.get(idx) + " not supported yet!");
- }
- }
- return new BeamSqlRecordType(fieldNames, fieldTypes, fieldCoders);
- }
-
- @Override
- public void validateValueType(int index, Object fieldValue) throws IllegalArgumentException {
- if (null == fieldValue) {// no need to do type check for NULL value
- return;
- }
-
- int fieldType = fieldsType.get(index);
- Class javaClazz = SQL_TYPE_TO_JAVA_CLASS.get(fieldType);
- if (javaClazz == null) {
- throw new IllegalArgumentException("Data type: " + fieldType + " not supported yet!");
- }
-
- if (!fieldValue.getClass().equals(javaClazz)) {
- throw new IllegalArgumentException(
- String.format("[%s](%s) doesn't match type [%s]",
- fieldValue, fieldValue.getClass(), fieldType)
- );
- }
- }
-
- public List<Integer> getFieldsType() {
- return fieldsType;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj != null && obj instanceof BeamSqlRecordType) {
- BeamSqlRecordType ins = (BeamSqlRecordType) obj;
- return fieldsType.equals(ins.getFieldsType()) && getFieldsName().equals(ins.getFieldsName());
- } else {
- return false;
- }
- }
-
- @Override
- public int hashCode() {
- return 31 * getFieldsName().hashCode() + getFieldsType().hashCode();
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlTable.java
index b370d9d..828ac43 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlTable.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlTable.java
@@ -49,5 +49,5 @@ public interface BeamSqlTable {
/**
* Get the schema info of the table.
*/
- BeamSqlRecordType getRowType();
+ BeamRecordSqlType getRowType();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java
index 19d3e39..99f9522 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java
@@ -40,27 +40,27 @@ public final class BeamTableUtils {
public static BeamRecord csvLine2BeamSqlRow(
CSVFormat csvFormat,
String line,
- BeamSqlRecordType beamSqlRowType) {
- List<Object> fieldsValue = new ArrayList<>(beamSqlRowType.size());
+ BeamRecordSqlType beamRecordSqlType) {
+ List<Object> fieldsValue = new ArrayList<>(beamRecordSqlType.size());
try (StringReader reader = new StringReader(line)) {
CSVParser parser = csvFormat.parse(reader);
CSVRecord rawRecord = parser.getRecords().get(0);
- if (rawRecord.size() != beamSqlRowType.size()) {
+ if (rawRecord.size() != beamRecordSqlType.size()) {
throw new IllegalArgumentException(String.format(
"Expect %d fields, but actually %d",
- beamSqlRowType.size(), rawRecord.size()
+ beamRecordSqlType.size(), rawRecord.size()
));
} else {
- for (int idx = 0; idx < beamSqlRowType.size(); idx++) {
+ for (int idx = 0; idx < beamRecordSqlType.size(); idx++) {
String raw = rawRecord.get(idx);
- fieldsValue.add(autoCastField(beamSqlRowType.getFieldsType().get(idx), raw));
+ fieldsValue.add(autoCastField(beamRecordSqlType.getFieldTypeByIndex(idx), raw));
}
}
} catch (IOException e) {
throw new IllegalArgumentException("decodeRecord failed!", e);
}
- return new BeamRecord(beamSqlRowType, fieldsValue);
+ return new BeamRecord(beamRecordSqlType, fieldsValue);
}
public static String beamSqlRow2CsvLine(BeamRecord row, CSVFormat csvFormat) {
http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTable.java
index f137379..8c7e6f0 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTable.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTable.java
@@ -18,7 +18,7 @@
package org.apache.beam.sdk.extensions.sql.schema.kafka;
import java.util.List;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
@@ -34,12 +34,12 @@ import org.apache.commons.csv.CSVFormat;
*/
public class BeamKafkaCSVTable extends BeamKafkaTable {
private CSVFormat csvFormat;
- public BeamKafkaCSVTable(BeamSqlRecordType beamSqlRowType, String bootstrapServers,
+ public BeamKafkaCSVTable(BeamRecordSqlType beamSqlRowType, String bootstrapServers,
List<String> topics) {
this(beamSqlRowType, bootstrapServers, topics, CSVFormat.DEFAULT);
}
- public BeamKafkaCSVTable(BeamSqlRecordType beamSqlRowType, String bootstrapServers,
+ public BeamKafkaCSVTable(BeamRecordSqlType beamSqlRowType, String bootstrapServers,
List<String> topics, CSVFormat format) {
super(beamSqlRowType, bootstrapServers, topics);
this.csvFormat = format;
@@ -63,9 +63,9 @@ public class BeamKafkaCSVTable extends BeamKafkaTable {
*/
public static class CsvRecorderDecoder
extends PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamRecord>> {
- private BeamSqlRecordType rowType;
+ private BeamRecordSqlType rowType;
private CSVFormat format;
- public CsvRecorderDecoder(BeamSqlRecordType rowType, CSVFormat format) {
+ public CsvRecorderDecoder(BeamRecordSqlType rowType, CSVFormat format) {
this.rowType = rowType;
this.format = format;
}
@@ -88,9 +88,9 @@ public class BeamKafkaCSVTable extends BeamKafkaTable {
*/
public static class CsvRecorderEncoder
extends PTransform<PCollection<BeamRecord>, PCollection<KV<byte[], byte[]>>> {
- private BeamSqlRecordType rowType;
+ private BeamRecordSqlType rowType;
private CSVFormat format;
- public CsvRecorderEncoder(BeamSqlRecordType rowType, CSVFormat format) {
+ public CsvRecorderEncoder(BeamRecordSqlType rowType, CSVFormat format) {
this.rowType = rowType;
this.format = format;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaTable.java
index fac57bf..1d57839 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaTable.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaTable.java
@@ -26,7 +26,7 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable;
import org.apache.beam.sdk.extensions.sql.schema.BeamIOType;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.BeamRecord;
@@ -48,11 +48,11 @@ public abstract class BeamKafkaTable extends BaseBeamTable implements Serializab
private List<String> topics;
private Map<String, Object> configUpdates;
- protected BeamKafkaTable(BeamSqlRecordType beamSqlRowType) {
+ protected BeamKafkaTable(BeamRecordSqlType beamSqlRowType) {
super(beamSqlRowType);
}
- public BeamKafkaTable(BeamSqlRecordType beamSqlRowType, String bootstrapServers,
+ public BeamKafkaTable(BeamRecordSqlType beamSqlRowType, String bootstrapServers,
List<String> topics) {
super(beamSqlRowType);
this.bootstrapServers = bootstrapServers;
http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTable.java
index 0ec418c..79e56e6 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTable.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTable.java
@@ -19,7 +19,7 @@
package org.apache.beam.sdk.extensions.sql.schema.text;
import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.BeamRecord;
@@ -46,11 +46,11 @@ public class BeamTextCSVTable extends BeamTextTable {
/**
* CSV table with {@link CSVFormat#DEFAULT DEFAULT} format.
*/
- public BeamTextCSVTable(BeamSqlRecordType beamSqlRowType, String filePattern) {
+ public BeamTextCSVTable(BeamRecordSqlType beamSqlRowType, String filePattern) {
this(beamSqlRowType, filePattern, CSVFormat.DEFAULT);
}
- public BeamTextCSVTable(BeamSqlRecordType beamSqlRowType, String filePattern,
+ public BeamTextCSVTable(BeamRecordSqlType beamSqlRowType, String filePattern,
CSVFormat csvFormat) {
super(beamSqlRowType, filePattern);
this.csvFormat = csvFormat;
http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOReader.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOReader.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOReader.java
index ecb77e0..018dae5 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOReader.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOReader.java
@@ -19,7 +19,7 @@
package org.apache.beam.sdk.extensions.sql.schema.text;
import java.io.Serializable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
@@ -35,10 +35,10 @@ public class BeamTextCSVTableIOReader
extends PTransform<PCollection<String>, PCollection<BeamRecord>>
implements Serializable {
private String filePattern;
- protected BeamSqlRecordType beamSqlRowType;
+ protected BeamRecordSqlType beamSqlRowType;
protected CSVFormat csvFormat;
- public BeamTextCSVTableIOReader(BeamSqlRecordType beamSqlRowType, String filePattern,
+ public BeamTextCSVTableIOReader(BeamRecordSqlType beamSqlRowType, String filePattern,
CSVFormat csvFormat) {
this.filePattern = filePattern;
this.beamSqlRowType = beamSqlRowType;
http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOWriter.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOWriter.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOWriter.java
index c616973..53eb382 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOWriter.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOWriter.java
@@ -19,7 +19,7 @@
package org.apache.beam.sdk.extensions.sql.schema.text;
import java.io.Serializable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.DoFn;
@@ -36,10 +36,10 @@ import org.apache.commons.csv.CSVFormat;
public class BeamTextCSVTableIOWriter extends PTransform<PCollection<BeamRecord>, PDone>
implements Serializable {
private String filePattern;
- protected BeamSqlRecordType beamSqlRowType;
+ protected BeamRecordSqlType beamSqlRowType;
protected CSVFormat csvFormat;
- public BeamTextCSVTableIOWriter(BeamSqlRecordType beamSqlRowType, String filePattern,
+ public BeamTextCSVTableIOWriter(BeamRecordSqlType beamSqlRowType, String filePattern,
CSVFormat csvFormat) {
this.filePattern = filePattern;
this.beamSqlRowType = beamSqlRowType;
http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextTable.java
index 4284366..80e81aa 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextTable.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextTable.java
@@ -21,7 +21,7 @@ package org.apache.beam.sdk.extensions.sql.schema.text;
import java.io.Serializable;
import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable;
import org.apache.beam.sdk.extensions.sql.schema.BeamIOType;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
/**
* {@code BeamTextTable} represents a text file/directory(backed by {@code TextIO}).
@@ -29,7 +29,7 @@ import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
public abstract class BeamTextTable extends BaseBeamTable implements Serializable {
protected String filePattern;
- protected BeamTextTable(BeamSqlRecordType beamSqlRowType, String filePattern) {
+ protected BeamTextTable(BeamRecordSqlType beamSqlRowType, String filePattern) {
super(beamSqlRowType);
this.filePattern = filePattern;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
index 19ca398..4e74dbb 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
@@ -19,7 +19,7 @@ package org.apache.beam.sdk.extensions.sql;
import java.sql.Types;
import java.util.Arrays;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PCollection;
@@ -54,7 +54,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
PCollection<BeamRecord> result =
input.apply("testAggregationWithoutWindow", BeamSql.simpleQuery(sql));
- BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int2", "size"),
+ BeamRecordSqlType resultType = BeamRecordSqlType.create(Arrays.asList("f_int2", "size"),
Arrays.asList(Types.INTEGER, Types.BIGINT));
BeamRecord record = new BeamRecord(resultType, 0, 4L);
@@ -95,7 +95,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), input)
.apply("testAggregationFunctions", BeamSql.query(sql));
- BeamSqlRecordType resultType = BeamSqlRecordType.create(
+ BeamRecordSqlType resultType = BeamRecordSqlType.create(
Arrays.asList("f_int2", "size", "sum1", "avg1", "max1", "min1", "sum2", "avg2", "max2",
"min2", "sum3", "avg3", "max3", "min3", "sum4", "avg4", "max4", "min4", "sum5", "avg5",
"max5", "min5", "max6", "min6"),
@@ -141,7 +141,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
PCollection<BeamRecord> result =
input.apply("testDistinct", BeamSql.simpleQuery(sql));
- BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"),
+ BeamRecordSqlType resultType = BeamRecordSqlType.create(Arrays.asList("f_int", "f_long"),
Arrays.asList(Types.INTEGER, Types.BIGINT));
BeamRecord record1 = new BeamRecord(resultType, 1, 1000L);
@@ -179,7 +179,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), input)
.apply("testTumbleWindow", BeamSql.query(sql));
- BeamSqlRecordType resultType = BeamSqlRecordType.create(
+ BeamRecordSqlType resultType = BeamRecordSqlType.create(
Arrays.asList("f_int2", "size", "window_start"),
Arrays.asList(Types.INTEGER, Types.BIGINT, Types.TIMESTAMP));
@@ -215,7 +215,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
PCollection<BeamRecord> result =
input.apply("testHopWindow", BeamSql.simpleQuery(sql));
- BeamSqlRecordType resultType = BeamSqlRecordType.create(
+ BeamRecordSqlType resultType = BeamRecordSqlType.create(
Arrays.asList("f_int2", "size", "window_start"),
Arrays.asList(Types.INTEGER, Types.BIGINT, Types.TIMESTAMP));
@@ -254,7 +254,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), input)
.apply("testSessionWindow", BeamSql.query(sql));
- BeamSqlRecordType resultType = BeamSqlRecordType.create(
+ BeamRecordSqlType resultType = BeamRecordSqlType.create(
Arrays.asList("f_int2", "size", "window_start"),
Arrays.asList(Types.INTEGER, Types.BIGINT, Types.TIMESTAMP));
http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java
index 02427ae..ef75ee2 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java
@@ -25,7 +25,7 @@ import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.Create;
@@ -52,7 +52,7 @@ public class BeamSqlDslBase {
@Rule
public ExpectedException exceptions = ExpectedException.none();
- public static BeamSqlRecordType rowTypeInTableA;
+ public static BeamRecordSqlType rowTypeInTableA;
public static List<BeamRecord> recordsInTableA;
//bounded PCollections
@@ -65,7 +65,7 @@ public class BeamSqlDslBase {
@BeforeClass
public static void prepareClass() throws ParseException {
- rowTypeInTableA = BeamSqlRecordType.create(
+ rowTypeInTableA = BeamRecordSqlType.create(
Arrays.asList("f_int", "f_long", "f_short", "f_byte", "f_float", "f_double", "f_string",
"f_timestamp", "f_int2", "f_decimal"),
Arrays.asList(Types.INTEGER, Types.BIGINT, Types.SMALLINT, Types.TINYINT, Types.FLOAT,
http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java
index d5d0a24..0876dd9 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java
@@ -24,7 +24,7 @@ import static org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRelBoundedVsBo
import java.sql.Types;
import java.util.Arrays;
import org.apache.beam.sdk.coders.BeamRecordCoder;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.values.BeamRecord;
@@ -41,8 +41,8 @@ public class BeamSqlDslJoinTest {
@Rule
public final TestPipeline pipeline = TestPipeline.create();
- private static final BeamSqlRecordType SOURCE_RECORD_TYPE =
- BeamSqlRecordType.create(
+ private static final BeamRecordSqlType SOURCE_RECORD_TYPE =
+ BeamRecordSqlType.create(
Arrays.asList(
"order_id", "site_id", "price"
),
@@ -53,8 +53,8 @@ public class BeamSqlDslJoinTest {
private static final BeamRecordCoder SOURCE_CODER = SOURCE_RECORD_TYPE.getRecordCoder();
- private static final BeamSqlRecordType RESULT_RECORD_TYPE =
- BeamSqlRecordType.create(
+ private static final BeamRecordSqlType RESULT_RECORD_TYPE =
+ BeamRecordSqlType.create(
Arrays.asList(
"order_id", "site_id", "price", "order_id0", "site_id0", "price0"
),
http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java
index c8041a8..46aea99 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java
@@ -19,7 +19,7 @@ package org.apache.beam.sdk.extensions.sql;
import java.sql.Types;
import java.util.Arrays;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PCollection;
@@ -81,7 +81,7 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), input)
.apply("testPartialFields", BeamSql.query(sql));
- BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"),
+ BeamRecordSqlType resultType = BeamRecordSqlType.create(Arrays.asList("f_int", "f_long"),
Arrays.asList(Types.INTEGER, Types.BIGINT));
BeamRecord record = new BeamRecord(resultType
@@ -115,7 +115,7 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), input)
.apply("testPartialFieldsInMultipleRow", BeamSql.query(sql));
- BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"),
+ BeamRecordSqlType resultType = BeamRecordSqlType.create(Arrays.asList("f_int", "f_long"),
Arrays.asList(Types.INTEGER, Types.BIGINT));
BeamRecord record1 = new BeamRecord(resultType
@@ -158,7 +158,7 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), input)
.apply("testPartialFieldsInRows", BeamSql.query(sql));
- BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"),
+ BeamRecordSqlType resultType = BeamRecordSqlType.create(Arrays.asList("f_int", "f_long"),
Arrays.asList(Types.INTEGER, Types.BIGINT));
BeamRecord record1 = new BeamRecord(resultType
@@ -201,7 +201,7 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), input)
.apply("testLiteralField", BeamSql.query(sql));
- BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("literal_field"),
+ BeamRecordSqlType resultType = BeamRecordSqlType.create(Arrays.asList("literal_field"),
Arrays.asList(Types.INTEGER));
BeamRecord record = new BeamRecord(resultType, 1);
http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
index 25e76e9..7302376 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
@@ -20,7 +20,7 @@ package org.apache.beam.sdk.extensions.sql;
import java.sql.Types;
import java.util.Arrays;
import java.util.Iterator;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdaf;
import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdf;
import org.apache.beam.sdk.testing.PAssert;
@@ -39,7 +39,7 @@ public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase {
*/
@Test
public void testUdaf() throws Exception {
- BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int2", "squaresum"),
+ BeamRecordSqlType resultType = BeamRecordSqlType.create(Arrays.asList("f_int2", "squaresum"),
Arrays.asList(Types.INTEGER, Types.INTEGER));
BeamRecord record = new BeamRecord(resultType, 0, 30);
@@ -67,7 +67,7 @@ public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase {
*/
@Test
public void testUdf() throws Exception{
- BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "cubicvalue"),
+ BeamRecordSqlType resultType = BeamRecordSqlType.create(Arrays.asList("f_int", "cubicvalue"),
Arrays.asList(Types.INTEGER, Types.INTEGER));
BeamRecord record = new BeamRecord(resultType, 2, 8);
http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
index e9dc88f..aa1fc29 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
@@ -21,7 +21,7 @@ package org.apache.beam.sdk.extensions.sql;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.BeamRecord;
@@ -69,7 +69,7 @@ public class TestUtils {
* {@code}
*/
public static class RowsBuilder {
- private BeamSqlRecordType type;
+ private BeamRecordSqlType type;
private List<BeamRecord> rows = new ArrayList<>();
/**
@@ -86,7 +86,7 @@ public class TestUtils {
* @args pairs of column type and column names.
*/
public static RowsBuilder of(final Object... args) {
- BeamSqlRecordType beamSQLRowType = buildBeamSqlRowType(args);
+ BeamRecordSqlType beamSQLRowType = buildBeamSqlRowType(args);
RowsBuilder builder = new RowsBuilder();
builder.type = beamSQLRowType;
@@ -103,7 +103,7 @@ public class TestUtils {
* )}</pre>
* @beamSQLRowType the record type.
*/
- public static RowsBuilder of(final BeamSqlRecordType beamSQLRowType) {
+ public static RowsBuilder of(final BeamRecordSqlType beamSQLRowType) {
RowsBuilder builder = new RowsBuilder();
builder.type = beamSQLRowType;
@@ -153,7 +153,7 @@ public class TestUtils {
* )
* }</pre>
*/
- public static BeamSqlRecordType buildBeamSqlRowType(Object... args) {
+ public static BeamRecordSqlType buildBeamSqlRowType(Object... args) {
List<Integer> types = new ArrayList<>();
List<String> names = new ArrayList<>();
@@ -162,7 +162,7 @@ public class TestUtils {
names.add((String) args[i + 1]);
}
- return BeamSqlRecordType.create(names, types);
+ return BeamRecordSqlType.create(names, types);
}
/**
@@ -179,7 +179,7 @@ public class TestUtils {
* )
* }</pre>
*/
- public static List<BeamRecord> buildRows(BeamSqlRecordType type, List args) {
+ public static List<BeamRecord> buildRows(BeamRecordSqlType type, List args) {
List<BeamRecord> rows = new ArrayList<>();
int fieldCount = type.size();
http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java
index 86e2ca4..97905c5 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java
@@ -24,7 +24,7 @@ import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRelDataTypeSystem;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRuleSets;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.config.Lex;
@@ -57,7 +57,7 @@ public class BeamSqlFnExecutorTestBase {
RelDataTypeSystem.DEFAULT);
public static RelDataType relDataType;
- public static BeamSqlRecordType beamRowType;
+ public static BeamRecordSqlType beamRowType;
public static BeamRecord record;
public static RelBuilder relBuilder;
http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
index b58a17f..5898e2e 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
@@ -32,7 +32,7 @@ import java.util.TimeZone;
import org.apache.beam.sdk.extensions.sql.BeamSql;
import org.apache.beam.sdk.extensions.sql.TestUtils;
import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.values.BeamRecord;
@@ -62,7 +62,7 @@ public class BeamSqlBuiltinFunctionsIntegrationTestBase {
public final TestPipeline pipeline = TestPipeline.create();
protected PCollection<BeamRecord> getTestPCollection() {
- BeamSqlRecordType type = BeamSqlRecordType.create(
+ BeamRecordSqlType type = BeamRecordSqlType.create(
Arrays.asList("ts", "c_tinyint", "c_smallint",
"c_integer", "c_bigint", "c_float", "c_double", "c_decimal",
"c_tinyint_max", "c_smallint_max", "c_integer_max", "c_bigint_max"),
@@ -155,7 +155,7 @@ public class BeamSqlBuiltinFunctionsIntegrationTestBase {
PCollection<BeamRecord> rows = inputCollection.apply(BeamSql.simpleQuery(getSql()));
PAssert.that(rows).containsInAnyOrder(
TestUtils.RowsBuilder
- .of(BeamSqlRecordType.create(names, types))
+ .of(BeamRecordSqlType.create(names, types))
.addRows(values)
.getRows()
);
http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java
index 3569e31..4ce2f45 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java
@@ -22,7 +22,7 @@ import java.math.BigDecimal;
import java.sql.Types;
import java.util.Arrays;
import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PCollection;
import org.junit.Test;
@@ -282,7 +282,7 @@ public class BeamSqlComparisonOperatorsIntegrationTest
}
@Override protected PCollection<BeamRecord> getTestPCollection() {
- BeamSqlRecordType type = BeamSqlRecordType.create(
+ BeamRecordSqlType type = BeamRecordSqlType.create(
Arrays.asList(
"c_tinyint_0", "c_tinyint_1", "c_tinyint_2",
"c_smallint_0", "c_smallint_1", "c_smallint_2",