You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lz...@apache.org on 2017/07/19 02:06:41 UTC
[1/3] beam git commit: [BEAM-2621] BeamSqlRecordType -> BeamSqlRowType
Repository: beam
Updated Branches:
refs/heads/DSL_SQL a1f7cf6de -> d4d615a72
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java
index 41a786f..9ed56b4 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java
@@ -18,8 +18,8 @@
package org.apache.beam.dsls.sql.schema.text;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.PTransform;
@@ -46,13 +46,13 @@ public class BeamTextCSVTable extends BeamTextTable {
/**
* CSV table with {@link CSVFormat#DEFAULT DEFAULT} format.
*/
- public BeamTextCSVTable(BeamSqlRecordType beamSqlRecordType, String filePattern) {
- this(beamSqlRecordType, filePattern, CSVFormat.DEFAULT);
+ public BeamTextCSVTable(BeamSqlRowType beamSqlRowType, String filePattern) {
+ this(beamSqlRowType, filePattern, CSVFormat.DEFAULT);
}
- public BeamTextCSVTable(BeamSqlRecordType beamSqlRecordType, String filePattern,
+ public BeamTextCSVTable(BeamSqlRowType beamSqlRowType, String filePattern,
CSVFormat csvFormat) {
- super(beamSqlRecordType, filePattern);
+ super(beamSqlRowType, filePattern);
this.csvFormat = csvFormat;
}
@@ -60,11 +60,11 @@ public class BeamTextCSVTable extends BeamTextTable {
public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) {
return PBegin.in(pipeline).apply("decodeRecord", TextIO.read().from(filePattern))
.apply("parseCSVLine",
- new BeamTextCSVTableIOReader(beamSqlRecordType, filePattern, csvFormat));
+ new BeamTextCSVTableIOReader(beamSqlRowType, filePattern, csvFormat));
}
@Override
public PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter() {
- return new BeamTextCSVTableIOWriter(beamSqlRecordType, filePattern, csvFormat);
+ return new BeamTextCSVTableIOWriter(beamSqlRowType, filePattern, csvFormat);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java
index ef0a465..874c3e4 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java
@@ -21,9 +21,8 @@ package org.apache.beam.dsls.sql.schema.text;
import static org.apache.beam.dsls.sql.schema.BeamTableUtils.csvLine2BeamSqlRow;
import java.io.Serializable;
-
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
@@ -37,13 +36,13 @@ public class BeamTextCSVTableIOReader
extends PTransform<PCollection<String>, PCollection<BeamSqlRow>>
implements Serializable {
private String filePattern;
- protected BeamSqlRecordType beamSqlRecordType;
+ protected BeamSqlRowType beamSqlRowType;
protected CSVFormat csvFormat;
- public BeamTextCSVTableIOReader(BeamSqlRecordType beamSqlRecordType, String filePattern,
+ public BeamTextCSVTableIOReader(BeamSqlRowType beamSqlRowType, String filePattern,
CSVFormat csvFormat) {
this.filePattern = filePattern;
- this.beamSqlRecordType = beamSqlRecordType;
+ this.beamSqlRowType = beamSqlRowType;
this.csvFormat = csvFormat;
}
@@ -53,7 +52,7 @@ public class BeamTextCSVTableIOReader
@ProcessElement
public void processElement(ProcessContext ctx) {
String str = ctx.element();
- ctx.output(csvLine2BeamSqlRow(csvFormat, str, beamSqlRecordType));
+ ctx.output(csvLine2BeamSqlRow(csvFormat, str, beamSqlRowType));
}
}));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java
index 35a546c..f61bb71 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java
@@ -21,9 +21,8 @@ package org.apache.beam.dsls.sql.schema.text;
import static org.apache.beam.dsls.sql.schema.BeamTableUtils.beamSqlRow2CsvLine;
import java.io.Serializable;
-
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
@@ -38,13 +37,13 @@ import org.apache.commons.csv.CSVFormat;
public class BeamTextCSVTableIOWriter extends PTransform<PCollection<BeamSqlRow>, PDone>
implements Serializable {
private String filePattern;
- protected BeamSqlRecordType beamSqlRecordType;
+ protected BeamSqlRowType beamSqlRowType;
protected CSVFormat csvFormat;
- public BeamTextCSVTableIOWriter(BeamSqlRecordType beamSqlRecordType, String filePattern,
+ public BeamTextCSVTableIOWriter(BeamSqlRowType beamSqlRowType, String filePattern,
CSVFormat csvFormat) {
this.filePattern = filePattern;
- this.beamSqlRecordType = beamSqlRecordType;
+ this.beamSqlRowType = beamSqlRowType;
this.csvFormat = csvFormat;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextTable.java
index 525c210..6dc6cd0 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextTable.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextTable.java
@@ -22,7 +22,7 @@ import java.io.Serializable;
import org.apache.beam.dsls.sql.schema.BaseBeamTable;
import org.apache.beam.dsls.sql.schema.BeamIOType;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
/**
* {@code BeamTextTable} represents a text file/directory(backed by {@code TextIO}).
@@ -30,8 +30,8 @@ import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
public abstract class BeamTextTable extends BaseBeamTable implements Serializable {
protected String filePattern;
- protected BeamTextTable(BeamSqlRecordType beamSqlRecordType, String filePattern) {
- super(beamSqlRecordType);
+ protected BeamTextTable(BeamSqlRowType beamSqlRowType, String filePattern) {
+ super(beamSqlRowType);
this.filePattern = filePattern;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java
index 34b169f..5b21765 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java
@@ -27,8 +27,8 @@ import java.util.Iterator;
import java.util.List;
import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlInputRefExpression;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
import org.apache.beam.dsls.sql.schema.BeamSqlUdaf;
import org.apache.beam.dsls.sql.utils.CalciteUtils;
import org.apache.beam.sdk.coders.BigDecimalCoder;
@@ -57,13 +57,13 @@ public class BeamAggregationTransforms implements Serializable{
* Merge KV to single record.
*/
public static class MergeAggregationRecord extends DoFn<KV<BeamSqlRow, BeamSqlRow>, BeamSqlRow> {
- private BeamSqlRecordType outRecordType;
+ private BeamSqlRowType outRowType;
private List<String> aggFieldNames;
private int windowStartFieldIdx;
- public MergeAggregationRecord(BeamSqlRecordType outRecordType, List<AggregateCall> aggList
+ public MergeAggregationRecord(BeamSqlRowType outRowType, List<AggregateCall> aggList
, int windowStartFieldIdx) {
- this.outRecordType = outRecordType;
+ this.outRowType = outRowType;
this.aggFieldNames = new ArrayList<>();
for (AggregateCall ac : aggList) {
aggFieldNames.add(ac.getName());
@@ -73,7 +73,7 @@ public class BeamAggregationTransforms implements Serializable{
@ProcessElement
public void processElement(ProcessContext c, BoundedWindow window) {
- BeamSqlRow outRecord = new BeamSqlRow(outRecordType);
+ BeamSqlRow outRecord = new BeamSqlRow(outRowType);
outRecord.updateWindowRange(c.element().getKey(), window);
KV<BeamSqlRow, BeamSqlRow> kvRecord = c.element();
@@ -109,7 +109,7 @@ public class BeamAggregationTransforms implements Serializable{
@Override
public BeamSqlRow apply(BeamSqlRow input) {
- BeamSqlRecordType typeOfKey = exTypeOfKeyRecord(input.getDataType());
+ BeamSqlRowType typeOfKey = exTypeOfKeyRecord(input.getDataType());
BeamSqlRow keyOfRecord = new BeamSqlRow(typeOfKey);
keyOfRecord.updateWindowRange(input, null);
@@ -119,14 +119,14 @@ public class BeamAggregationTransforms implements Serializable{
return keyOfRecord;
}
- private BeamSqlRecordType exTypeOfKeyRecord(BeamSqlRecordType dataType) {
+ private BeamSqlRowType exTypeOfKeyRecord(BeamSqlRowType dataType) {
List<String> fieldNames = new ArrayList<>();
List<Integer> fieldTypes = new ArrayList<>();
for (int idx : groupByKeys) {
fieldNames.add(dataType.getFieldsName().get(idx));
fieldTypes.add(dataType.getFieldsType().get(idx));
}
- return BeamSqlRecordType.create(fieldNames, fieldTypes);
+ return BeamSqlRowType.create(fieldNames, fieldTypes);
}
}
@@ -154,10 +154,10 @@ public class BeamAggregationTransforms implements Serializable{
extends CombineFn<BeamSqlRow, AggregationAccumulator, BeamSqlRow> {
private List<BeamSqlUdaf> aggregators;
private List<BeamSqlExpression> sourceFieldExps;
- private BeamSqlRecordType finalRecordType;
+ private BeamSqlRowType finalRowType;
public AggregationAdaptor(List<AggregateCall> aggregationCalls,
- BeamSqlRecordType sourceRowRecordType) {
+ BeamSqlRowType sourceRowType) {
aggregators = new ArrayList<>();
sourceFieldExps = new ArrayList<>();
List<String> outFieldsName = new ArrayList<>();
@@ -165,7 +165,7 @@ public class BeamAggregationTransforms implements Serializable{
for (AggregateCall call : aggregationCalls) {
int refIndex = call.getArgList().size() > 0 ? call.getArgList().get(0) : 0;
BeamSqlExpression sourceExp = new BeamSqlInputRefExpression(
- CalciteUtils.getFieldType(sourceRowRecordType, refIndex), refIndex);
+ CalciteUtils.getFieldType(sourceRowType, refIndex), refIndex);
sourceFieldExps.add(sourceExp);
outFieldsName.add(call.name);
@@ -206,7 +206,7 @@ public class BeamAggregationTransforms implements Serializable{
break;
}
}
- finalRecordType = BeamSqlRecordType.create(outFieldsName, outFieldsType);
+ finalRowType = BeamSqlRowType.create(outFieldsName, outFieldsType);
}
@Override
public AggregationAccumulator createAccumulator() {
@@ -241,7 +241,7 @@ public class BeamAggregationTransforms implements Serializable{
}
@Override
public BeamSqlRow extractOutput(AggregationAccumulator accumulator) {
- BeamSqlRow result = new BeamSqlRow(finalRecordType);
+ BeamSqlRow result = new BeamSqlRow(finalRowType);
for (int idx = 0; idx < aggregators.size(); ++idx) {
result.addField(idx, aggregators.get(idx).result(accumulator.accumulatorElements.get(idx)));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamJoinTransforms.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamJoinTransforms.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamJoinTransforms.java
index 8169b83..9ea4376 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamJoinTransforms.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamJoinTransforms.java
@@ -22,8 +22,8 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;
@@ -60,7 +60,7 @@ public class BeamJoinTransforms {
? input.getDataType().getFieldsType().get(joinColumns.get(i).getKey()) :
input.getDataType().getFieldsType().get(joinColumns.get(i).getValue()));
}
- BeamSqlRecordType type = BeamSqlRecordType.create(names, types);
+ BeamSqlRowType type = BeamSqlRowType.create(names, types);
// build the row
BeamSqlRow row = new BeamSqlRow(type);
@@ -149,7 +149,7 @@ public class BeamJoinTransforms {
List<Integer> types = new ArrayList<>(leftRow.size() + rightRow.size());
types.addAll(leftRow.getDataType().getFieldsType());
types.addAll(rightRow.getDataType().getFieldsType());
- BeamSqlRecordType type = BeamSqlRecordType.create(names, types);
+ BeamSqlRowType type = BeamSqlRowType.create(names, types);
BeamSqlRow row = new BeamSqlRow(type);
// build the row
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlProjectFn.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlProjectFn.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlProjectFn.java
index 2a3357c..886ddcf 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlProjectFn.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlProjectFn.java
@@ -20,8 +20,8 @@ package org.apache.beam.dsls.sql.transform;
import java.util.List;
import org.apache.beam.dsls.sql.interpreter.BeamSqlExpressionExecutor;
import org.apache.beam.dsls.sql.rel.BeamProjectRel;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
import org.apache.beam.dsls.sql.schema.BeamTableUtils;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -34,14 +34,14 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
public class BeamSqlProjectFn extends DoFn<BeamSqlRow, BeamSqlRow> {
private String stepName;
private BeamSqlExpressionExecutor executor;
- private BeamSqlRecordType outputRecordType;
+ private BeamSqlRowType outputRowType;
public BeamSqlProjectFn(String stepName, BeamSqlExpressionExecutor executor,
- BeamSqlRecordType outputRecordType) {
+ BeamSqlRowType outputRowType) {
super();
this.stepName = stepName;
this.executor = executor;
- this.outputRecordType = outputRecordType;
+ this.outputRowType = outputRowType;
}
@Setup
@@ -51,11 +51,11 @@ public class BeamSqlProjectFn extends DoFn<BeamSqlRow, BeamSqlRow> {
@ProcessElement
public void processElement(ProcessContext c, BoundedWindow window) {
- BeamSqlRow inputRecord = c.element();
- List<Object> results = executor.execute(inputRecord);
+ BeamSqlRow inputRow = c.element();
+ List<Object> results = executor.execute(inputRow);
- BeamSqlRow outRow = new BeamSqlRow(outputRecordType);
- outRow.updateWindowRange(inputRecord, window);
+ BeamSqlRow outRow = new BeamSqlRow(outputRowType);
+ outRow.updateWindowRange(inputRow, window);
for (int idx = 0; idx < results.size(); ++idx) {
BeamTableUtils.addFieldWithAutoTypeCasting(outRow, idx, results.get(idx));
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java
index 919ae5f..4b8696b 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java
@@ -23,7 +23,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeField;
@@ -78,27 +78,27 @@ public class CalciteUtils {
/**
* Get the {@code SqlTypeName} for the specified column of a table.
*/
- public static SqlTypeName getFieldType(BeamSqlRecordType schema, int index) {
+ public static SqlTypeName getFieldType(BeamSqlRowType schema, int index) {
return toCalciteType(schema.getFieldsType().get(index));
}
/**
- * Generate {@code BeamSqlRecordType} from {@code RelDataType} which is used to create table.
+ * Generate {@code BeamSqlRowType} from {@code RelDataType} which is used to create table.
*/
- public static BeamSqlRecordType toBeamRecordType(RelDataType tableInfo) {
+ public static BeamSqlRowType toBeamRowType(RelDataType tableInfo) {
List<String> fieldNames = new ArrayList<>();
List<Integer> fieldTypes = new ArrayList<>();
for (RelDataTypeField f : tableInfo.getFieldList()) {
fieldNames.add(f.getName());
fieldTypes.add(toJavaType(f.getType().getSqlTypeName()));
}
- return BeamSqlRecordType.create(fieldNames, fieldTypes);
+ return BeamSqlRowType.create(fieldNames, fieldTypes);
}
/**
* Create an instance of {@code RelDataType} so it can be used to create a table.
*/
- public static RelProtoDataType toCalciteRecordType(final BeamSqlRecordType that) {
+ public static RelProtoDataType toCalciteRowType(final BeamSqlRowType that) {
return new RelProtoDataType() {
@Override
public RelDataType apply(RelDataTypeFactory a) {
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java
index 471a856..a142514 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java
@@ -19,8 +19,8 @@ package org.apache.beam.dsls.sql;
import java.sql.Types;
import java.util.Arrays;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
@@ -55,7 +55,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
PCollection<BeamSqlRow> result =
input.apply("testAggregationWithoutWindow", BeamSql.simpleQuery(sql));
- BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int2", "size"),
+ BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int2", "size"),
Arrays.asList(Types.INTEGER, Types.BIGINT));
BeamSqlRow record = new BeamSqlRow(resultType);
@@ -98,7 +98,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
.apply("testAggregationFunctions", BeamSql.query(sql));
- BeamSqlRecordType resultType = BeamSqlRecordType.create(
+ BeamSqlRowType resultType = BeamSqlRowType.create(
Arrays.asList("f_int2", "size", "sum1", "avg1", "max1", "min1", "sum2", "avg2", "max2",
"min2", "sum3", "avg3", "max3", "min3", "sum4", "avg4", "max4", "min4", "sum5", "avg5",
"max5", "min5", "max6", "min6"),
@@ -167,7 +167,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
PCollection<BeamSqlRow> result =
input.apply("testDistinct", BeamSql.simpleQuery(sql));
- BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"),
+ BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", "f_long"),
Arrays.asList(Types.INTEGER, Types.BIGINT));
BeamSqlRow record1 = new BeamSqlRow(resultType);
@@ -216,7 +216,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
.apply("testTumbleWindow", BeamSql.query(sql));
- BeamSqlRecordType resultType = BeamSqlRecordType.create(
+ BeamSqlRowType resultType = BeamSqlRowType.create(
Arrays.asList("f_int2", "size", "window_start"),
Arrays.asList(Types.INTEGER, Types.BIGINT, Types.TIMESTAMP));
@@ -263,7 +263,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
PCollection<BeamSqlRow> result =
input.apply("testHopWindow", BeamSql.simpleQuery(sql));
- BeamSqlRecordType resultType = BeamSqlRecordType.create(
+ BeamSqlRowType resultType = BeamSqlRowType.create(
Arrays.asList("f_int2", "size", "window_start"),
Arrays.asList(Types.INTEGER, Types.BIGINT, Types.TIMESTAMP));
@@ -325,7 +325,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
.apply("testSessionWindow", BeamSql.query(sql));
- BeamSqlRecordType resultType = BeamSqlRecordType.create(
+ BeamSqlRowType resultType = BeamSqlRowType.create(
Arrays.asList("f_int2", "size", "window_start"),
Arrays.asList(Types.INTEGER, Types.BIGINT, Types.TIMESTAMP));
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java
index 57fcbc3..24f1a0a 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java
@@ -24,9 +24,9 @@ import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
import org.apache.beam.dsls.sql.schema.BeamSqlRow;
import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.Create;
@@ -52,7 +52,7 @@ public class BeamSqlDslBase {
@Rule
public ExpectedException exceptions = ExpectedException.none();
- public static BeamSqlRecordType recordTypeInTableA;
+ public static BeamSqlRowType rowTypeInTableA;
public static List<BeamSqlRow> recordsInTableA;
//bounded PCollections
@@ -65,22 +65,22 @@ public class BeamSqlDslBase {
@BeforeClass
public static void prepareClass() throws ParseException {
- recordTypeInTableA = BeamSqlRecordType.create(
+ rowTypeInTableA = BeamSqlRowType.create(
Arrays.asList("f_int", "f_long", "f_short", "f_byte", "f_float", "f_double", "f_string",
"f_timestamp", "f_int2"),
Arrays.asList(Types.INTEGER, Types.BIGINT, Types.SMALLINT, Types.TINYINT, Types.FLOAT,
Types.DOUBLE, Types.VARCHAR, Types.TIMESTAMP, Types.INTEGER));
- recordsInTableA = prepareInputRecordsInTableA();
+ recordsInTableA = prepareInputRowsInTableA();
}
@Before
public void preparePCollections(){
boundedInput1 = PBegin.in(pipeline).apply("boundedInput1",
- Create.of(recordsInTableA).withCoder(new BeamSqlRowCoder(recordTypeInTableA)));
+ Create.of(recordsInTableA).withCoder(new BeamSqlRowCoder(rowTypeInTableA)));
boundedInput2 = PBegin.in(pipeline).apply("boundedInput2",
- Create.of(recordsInTableA.get(0)).withCoder(new BeamSqlRowCoder(recordTypeInTableA)));
+ Create.of(recordsInTableA.get(0)).withCoder(new BeamSqlRowCoder(rowTypeInTableA)));
unboundedInput1 = prepareUnboundedPCollection1();
unboundedInput2 = prepareUnboundedPCollection2();
@@ -88,7 +88,7 @@ public class BeamSqlDslBase {
private PCollection<BeamSqlRow> prepareUnboundedPCollection1() {
TestStream.Builder<BeamSqlRow> values = TestStream
- .create(new BeamSqlRowCoder(recordTypeInTableA));
+ .create(new BeamSqlRowCoder(rowTypeInTableA));
for (BeamSqlRow row : recordsInTableA) {
values = values.advanceWatermarkTo(new Instant(row.getDate("f_timestamp")));
@@ -100,7 +100,7 @@ public class BeamSqlDslBase {
private PCollection<BeamSqlRow> prepareUnboundedPCollection2() {
TestStream.Builder<BeamSqlRow> values = TestStream
- .create(new BeamSqlRowCoder(recordTypeInTableA));
+ .create(new BeamSqlRowCoder(rowTypeInTableA));
BeamSqlRow row = recordsInTableA.get(0);
values = values.advanceWatermarkTo(new Instant(row.getDate("f_timestamp")));
@@ -109,10 +109,10 @@ public class BeamSqlDslBase {
return PBegin.in(pipeline).apply("unboundedInput2", values.advanceWatermarkToInfinity());
}
- private static List<BeamSqlRow> prepareInputRecordsInTableA() throws ParseException{
+ private static List<BeamSqlRow> prepareInputRowsInTableA() throws ParseException{
List<BeamSqlRow> rows = new ArrayList<>();
- BeamSqlRow row1 = new BeamSqlRow(recordTypeInTableA);
+ BeamSqlRow row1 = new BeamSqlRow(rowTypeInTableA);
row1.addField(0, 1);
row1.addField(1, 1000L);
row1.addField(2, Short.valueOf("1"));
@@ -124,7 +124,7 @@ public class BeamSqlDslBase {
row1.addField(8, 0);
rows.add(row1);
- BeamSqlRow row2 = new BeamSqlRow(recordTypeInTableA);
+ BeamSqlRow row2 = new BeamSqlRow(rowTypeInTableA);
row2.addField(0, 2);
row2.addField(1, 2000L);
row2.addField(2, Short.valueOf("2"));
@@ -136,7 +136,7 @@ public class BeamSqlDslBase {
row2.addField(8, 0);
rows.add(row2);
- BeamSqlRow row3 = new BeamSqlRow(recordTypeInTableA);
+ BeamSqlRow row3 = new BeamSqlRow(rowTypeInTableA);
row3.addField(0, 3);
row3.addField(1, 3000L);
row3.addField(2, Short.valueOf("3"));
@@ -148,7 +148,7 @@ public class BeamSqlDslBase {
row3.addField(8, 0);
rows.add(row3);
- BeamSqlRow row4 = new BeamSqlRow(recordTypeInTableA);
+ BeamSqlRow row4 = new BeamSqlRow(rowTypeInTableA);
row4.addField(0, 4);
row4.addField(1, 4000L);
row4.addField(2, Short.valueOf("4"));
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslJoinTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslJoinTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslJoinTest.java
index ae5f4e5..e010915 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslJoinTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslJoinTest.java
@@ -23,9 +23,9 @@ import static org.apache.beam.dsls.sql.rel.BeamJoinRelBoundedVsBoundedTest.ORDER
import java.sql.Types;
import java.util.Arrays;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
import org.apache.beam.dsls.sql.schema.BeamSqlRow;
import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.values.PCollection;
@@ -41,8 +41,8 @@ public class BeamSqlDslJoinTest {
@Rule
public final TestPipeline pipeline = TestPipeline.create();
- private static final BeamSqlRecordType SOURCE_RECORD_TYPE =
- BeamSqlRecordType.create(
+ private static final BeamSqlRowType SOURCE_RECORD_TYPE =
+ BeamSqlRowType.create(
Arrays.asList(
"order_id", "site_id", "price"
),
@@ -54,8 +54,8 @@ public class BeamSqlDslJoinTest {
private static final BeamSqlRowCoder SOURCE_CODER =
new BeamSqlRowCoder(SOURCE_RECORD_TYPE);
- private static final BeamSqlRecordType RESULT_RECORD_TYPE =
- BeamSqlRecordType.create(
+ private static final BeamSqlRowType RESULT_RECORD_TYPE =
+ BeamSqlRowType.create(
Arrays.asList(
"order_id", "site_id", "price", "order_id0", "site_id0", "price0"
),
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java
index 10f61b0..ab5a639 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java
@@ -19,8 +19,8 @@ package org.apache.beam.dsls.sql;
import java.sql.Types;
import java.util.Arrays;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
@@ -81,7 +81,7 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
.apply("testPartialFields", BeamSql.query(sql));
- BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"),
+ BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", "f_long"),
Arrays.asList(Types.INTEGER, Types.BIGINT));
BeamSqlRow record = new BeamSqlRow(resultType);
@@ -116,7 +116,7 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
.apply("testPartialFieldsInMultipleRow", BeamSql.query(sql));
- BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"),
+ BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", "f_long"),
Arrays.asList(Types.INTEGER, Types.BIGINT));
BeamSqlRow record1 = new BeamSqlRow(resultType);
@@ -163,7 +163,7 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
.apply("testPartialFieldsInRows", BeamSql.query(sql));
- BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"),
+ BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", "f_long"),
Arrays.asList(Types.INTEGER, Types.BIGINT));
BeamSqlRow record1 = new BeamSqlRow(resultType);
@@ -210,7 +210,7 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
.apply("testLiteralField", BeamSql.query(sql));
- BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("literal_field"),
+ BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("literal_field"),
Arrays.asList(Types.INTEGER));
BeamSqlRow record = new BeamSqlRow(resultType);
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslUdfUdafTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslUdfUdafTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslUdfUdafTest.java
index 332a273..726f658 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslUdfUdafTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslUdfUdafTest.java
@@ -20,8 +20,8 @@ package org.apache.beam.dsls.sql;
import java.sql.Types;
import java.util.Arrays;
import java.util.Iterator;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
import org.apache.beam.dsls.sql.schema.BeamSqlUdaf;
import org.apache.beam.dsls.sql.schema.BeamSqlUdf;
import org.apache.beam.sdk.testing.PAssert;
@@ -39,7 +39,7 @@ public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase {
*/
@Test
public void testUdaf() throws Exception {
- BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int2", "squaresum"),
+ BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int2", "squaresum"),
Arrays.asList(Types.INTEGER, Types.INTEGER));
BeamSqlRow record = new BeamSqlRow(resultType);
@@ -69,7 +69,7 @@ public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase {
*/
@Test
public void testUdf() throws Exception{
- BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "cubicvalue"),
+ BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", "cubicvalue"),
Arrays.asList(Types.INTEGER, Types.INTEGER));
BeamSqlRow record = new BeamSqlRow(resultType);
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java
index 8c0a28d..a669635 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java
@@ -21,8 +21,8 @@ package org.apache.beam.dsls.sql;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
import org.apache.beam.sdk.transforms.DoFn;
/**
@@ -69,7 +69,7 @@ public class TestUtils {
* {@code}
*/
public static class RowsBuilder {
- private BeamSqlRecordType type;
+ private BeamSqlRowType type;
private List<BeamSqlRow> rows = new ArrayList<>();
/**
@@ -86,9 +86,9 @@ public class TestUtils {
* @args pairs of column type and column names.
*/
public static RowsBuilder of(final Object... args) {
- BeamSqlRecordType beamSQLRecordType = buildBeamSqlRecordType(args);
+ BeamSqlRowType beamSQLRowType = buildBeamSqlRowType(args);
RowsBuilder builder = new RowsBuilder();
- builder.type = beamSQLRecordType;
+ builder.type = beamSQLRowType;
return builder;
}
@@ -99,13 +99,13 @@ public class TestUtils {
* <p>For example:
* <pre>{@code
* TestUtils.RowsBuilder.of(
- * beamSqlRecordType
+ * beamSqlRowType
* )}</pre>
- * @beamSQLRecordType the record type.
+ * @beamSQLRowType the record type.
*/
- public static RowsBuilder of(final BeamSqlRecordType beamSQLRecordType) {
+ public static RowsBuilder of(final BeamSqlRowType beamSQLRowType) {
RowsBuilder builder = new RowsBuilder();
- builder.type = beamSQLRecordType;
+ builder.type = beamSQLRowType;
return builder;
}
@@ -140,12 +140,12 @@ public class TestUtils {
}
/**
- * Convenient way to build a {@code BeamSqlRecordType}.
+ * Convenient way to build a {@code BeamSqlRowType}.
*
* <p>e.g.
*
* <pre>{@code
- * buildBeamSqlRecordType(
+ * buildBeamSqlRowType(
* Types.BIGINT, "order_id",
* Types.INTEGER, "site_id",
* Types.DOUBLE, "price",
@@ -153,7 +153,7 @@ public class TestUtils {
* )
* }</pre>
*/
- public static BeamSqlRecordType buildBeamSqlRecordType(Object... args) {
+ public static BeamSqlRowType buildBeamSqlRowType(Object... args) {
List<Integer> types = new ArrayList<>();
List<String> names = new ArrayList<>();
@@ -162,7 +162,7 @@ public class TestUtils {
names.add((String) args[i + 1]);
}
- return BeamSqlRecordType.create(names, types);
+ return BeamSqlRowType.create(names, types);
}
/**
@@ -172,14 +172,14 @@ public class TestUtils {
*
* <pre>{@code
* buildRows(
- * recordType,
+ * rowType,
* 1, 1, 1, // the first row
* 2, 2, 2, // the second row
* ...
* )
* }</pre>
*/
- public static List<BeamSqlRow> buildRows(BeamSqlRecordType type, List args) {
+ public static List<BeamSqlRow> buildRows(BeamSqlRowType type, List args) {
List<BeamSqlRow> rows = new ArrayList<>();
int fieldCount = type.size();
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
index ddbc3d8..b9ce9b4 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
@@ -32,9 +32,9 @@ import java.util.TimeZone;
import org.apache.beam.dsls.sql.BeamSql;
import org.apache.beam.dsls.sql.TestUtils;
import org.apache.beam.dsls.sql.mock.MockedBoundedTable;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
import org.apache.beam.dsls.sql.schema.BeamSqlRow;
import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.values.PCollection;
@@ -63,7 +63,7 @@ public class BeamSqlBuiltinFunctionsIntegrationTestBase {
public final TestPipeline pipeline = TestPipeline.create();
protected PCollection<BeamSqlRow> getTestPCollection() {
- BeamSqlRecordType type = BeamSqlRecordType.create(
+ BeamSqlRowType type = BeamSqlRowType.create(
Arrays.asList("ts", "c_tinyint", "c_smallint",
"c_integer", "c_bigint", "c_float", "c_double", "c_decimal",
"c_tinyint_max", "c_smallint_max", "c_integer_max", "c_bigint_max"),
@@ -156,7 +156,7 @@ public class BeamSqlBuiltinFunctionsIntegrationTestBase {
PCollection<BeamSqlRow> rows = inputCollection.apply(BeamSql.simpleQuery(getSql()));
PAssert.that(rows).containsInAnyOrder(
TestUtils.RowsBuilder
- .of(BeamSqlRecordType.create(names, types))
+ .of(BeamSqlRowType.create(names, types))
.addRows(values)
.getRows()
);
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTestBase.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTestBase.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTestBase.java
index 5afd273..d7b54c7 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTestBase.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTestBase.java
@@ -23,8 +23,8 @@ import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
import org.apache.beam.dsls.sql.planner.BeamQueryPlanner;
import org.apache.beam.dsls.sql.planner.BeamRelDataTypeSystem;
import org.apache.beam.dsls.sql.planner.BeamRuleSets;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
import org.apache.beam.dsls.sql.utils.CalciteUtils;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.config.Lex;
@@ -57,7 +57,7 @@ public class BeamSqlFnExecutorTestBase {
RelDataTypeSystem.DEFAULT);
public static RelDataType relDataType;
- public static BeamSqlRecordType beamRecordType;
+ public static BeamSqlRowType beamRowType;
public static BeamSqlRow record;
public static RelBuilder relBuilder;
@@ -70,8 +70,8 @@ public class BeamSqlFnExecutorTestBase {
.add("price", SqlTypeName.DOUBLE)
.add("order_time", SqlTypeName.BIGINT).build();
- beamRecordType = CalciteUtils.toBeamRecordType(relDataType);
- record = new BeamSqlRow(beamRecordType);
+ beamRowType = CalciteUtils.toBeamRowType(relDataType);
+ record = new BeamSqlRow(beamRowType);
record.addField(0, 1234567L);
record.addField(1, 0);
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedBoundedTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedBoundedTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedBoundedTable.java
index 84f49a9..6c1dcb2 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedBoundedTable.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedBoundedTable.java
@@ -17,7 +17,7 @@
*/
package org.apache.beam.dsls.sql.mock;
-import static org.apache.beam.dsls.sql.TestUtils.buildBeamSqlRecordType;
+import static org.apache.beam.dsls.sql.TestUtils.buildBeamSqlRowType;
import static org.apache.beam.dsls.sql.TestUtils.buildRows;
import java.util.ArrayList;
@@ -25,8 +25,8 @@ import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.beam.dsls.sql.schema.BeamIOType;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
@@ -45,8 +45,8 @@ public class MockedBoundedTable extends MockedTable {
/** rows flow out from this table. */
private final List<BeamSqlRow> rows = new ArrayList<>();
- public MockedBoundedTable(BeamSqlRecordType beamSqlRecordType) {
- super(beamSqlRecordType);
+ public MockedBoundedTable(BeamSqlRowType beamSqlRowType) {
+ super(beamSqlRowType);
}
/**
@@ -63,13 +63,13 @@ public class MockedBoundedTable extends MockedTable {
* }</pre>
*/
public static MockedBoundedTable of(final Object... args){
- return new MockedBoundedTable(buildBeamSqlRecordType(args));
+ return new MockedBoundedTable(buildBeamSqlRowType(args));
}
/**
* Build a mocked bounded table with the specified type.
*/
- public static MockedBoundedTable of(final BeamSqlRecordType type) {
+ public static MockedBoundedTable of(final BeamSqlRowType type) {
return new MockedBoundedTable(type);
}
@@ -88,7 +88,7 @@ public class MockedBoundedTable extends MockedTable {
* }</pre>
*/
public MockedBoundedTable addRows(Object... args) {
- List<BeamSqlRow> rows = buildRows(getRecordType(), Arrays.asList(args));
+ List<BeamSqlRow> rows = buildRows(getRowType(), Arrays.asList(args));
this.rows.addAll(rows);
return this;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedTable.java
index eed740a..858ae88 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedTable.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedTable.java
@@ -20,8 +20,8 @@ package org.apache.beam.dsls.sql.mock;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.beam.dsls.sql.schema.BaseBeamTable;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
@@ -31,8 +31,8 @@ import org.apache.beam.sdk.values.PDone;
*/
public abstract class MockedTable extends BaseBeamTable {
public static final AtomicInteger COUNTER = new AtomicInteger();
- public MockedTable(BeamSqlRecordType beamSqlRecordType) {
- super(beamSqlRecordType);
+ public MockedTable(BeamSqlRowType beamSqlRowType) {
+ super(beamSqlRowType);
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedUnboundedTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedUnboundedTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedUnboundedTable.java
index 0f8c912..ee6eb22 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedUnboundedTable.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedUnboundedTable.java
@@ -18,16 +18,16 @@
package org.apache.beam.dsls.sql.mock;
-import static org.apache.beam.dsls.sql.TestUtils.buildBeamSqlRecordType;
+import static org.apache.beam.dsls.sql.TestUtils.buildBeamSqlRowType;
import static org.apache.beam.dsls.sql.TestUtils.buildRows;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.dsls.sql.schema.BeamIOType;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
import org.apache.beam.dsls.sql.schema.BeamSqlRow;
import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.values.PCollection;
@@ -44,8 +44,8 @@ public class MockedUnboundedTable extends MockedTable {
private final List<Pair<Duration, List<BeamSqlRow>>> timestampedRows = new ArrayList<>();
/** specify the index of column in the row which stands for the event time field. */
private int timestampField;
- private MockedUnboundedTable(BeamSqlRecordType beamSqlRecordType) {
- super(beamSqlRecordType);
+ private MockedUnboundedTable(BeamSqlRowType beamSqlRowType) {
+ super(beamSqlRowType);
}
/**
@@ -62,7 +62,7 @@ public class MockedUnboundedTable extends MockedTable {
* }</pre>
*/
public static MockedUnboundedTable of(final Object... args){
- return new MockedUnboundedTable(buildBeamSqlRecordType(args));
+ return new MockedUnboundedTable(buildBeamSqlRowType(args));
}
public MockedUnboundedTable timestampColumnIndex(int idx) {
@@ -85,7 +85,7 @@ public class MockedUnboundedTable extends MockedTable {
* }</pre>
*/
public MockedUnboundedTable addRows(Duration duration, Object... args) {
- List<BeamSqlRow> rows = buildRows(getRecordType(), Arrays.asList(args));
+ List<BeamSqlRow> rows = buildRows(getRowType(), Arrays.asList(args));
// record the watermark + rows
this.timestampedRows.add(Pair.of(duration, rows));
return this;
@@ -97,7 +97,7 @@ public class MockedUnboundedTable extends MockedTable {
@Override public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) {
TestStream.Builder<BeamSqlRow> values = TestStream.create(
- new BeamSqlRowCoder(beamSqlRecordType));
+ new BeamSqlRowCoder(beamSqlRowType));
for (Pair<Duration, List<BeamSqlRow>> pair : timestampedRows) {
values = values.advanceWatermarkTo(new Instant(0).plus(pair.getKey()));
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java
index cf1d714..e41e341 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java
@@ -58,10 +58,10 @@ public class BeamSqlRowCoderTest {
}
};
- BeamSqlRecordType beamSQLRecordType = CalciteUtils.toBeamRecordType(
+ BeamSqlRowType beamSQLRowType = CalciteUtils.toBeamRowType(
protoRowType.apply(new JavaTypeFactoryImpl(
RelDataTypeSystem.DEFAULT)));
- BeamSqlRow row = new BeamSqlRow(beamSQLRecordType);
+ BeamSqlRow row = new BeamSqlRow(beamSQLRowType);
row.addField("col_tinyint", Byte.valueOf("1"));
row.addField("col_smallint", Short.valueOf("1"));
row.addField("col_integer", 1);
@@ -77,7 +77,7 @@ public class BeamSqlRowCoderTest {
row.addField("col_boolean", true);
- BeamSqlRowCoder coder = new BeamSqlRowCoder(beamSQLRecordType);
+ BeamSqlRowCoder coder = new BeamSqlRowCoder(beamSQLRowType);
CoderProperties.coderDecodeEncodeEqual(coder, row);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTableTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTableTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTableTest.java
index 9cd0915..01cd960 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTableTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTableTest.java
@@ -19,10 +19,9 @@
package org.apache.beam.dsls.sql.schema.kafka;
import java.io.Serializable;
-
import org.apache.beam.dsls.sql.planner.BeamQueryPlanner;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
import org.apache.beam.dsls.sql.utils.CalciteUtils;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
@@ -91,8 +90,8 @@ public class BeamKafkaCSVTableTest {
pipeline.run();
}
- private static BeamSqlRecordType genRowType() {
- return CalciteUtils.toBeamRecordType(new RelProtoDataType() {
+ private static BeamSqlRowType genRowType() {
+ return CalciteUtils.toBeamRowType(new RelProtoDataType() {
@Override public RelDataType apply(RelDataTypeFactory a0) {
return a0.builder().add("order_id", SqlTypeName.BIGINT)
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java
index 176df46..b6e11e5 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java
@@ -31,10 +31,9 @@ import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-
import org.apache.beam.dsls.sql.planner.BeamQueryPlanner;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
import org.apache.beam.dsls.sql.utils.CalciteUtils;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
@@ -81,20 +80,20 @@ public class BeamTextCSVTableTest {
private static File writerTargetFile;
@Test public void testBuildIOReader() {
- PCollection<BeamSqlRow> rows = new BeamTextCSVTable(buildBeamSqlRecordType(),
+ PCollection<BeamSqlRow> rows = new BeamTextCSVTable(buildBeamSqlRowType(),
readerSourceFile.getAbsolutePath()).buildIOReader(pipeline);
PAssert.that(rows).containsInAnyOrder(testDataRows);
pipeline.run();
}
@Test public void testBuildIOWriter() {
- new BeamTextCSVTable(buildBeamSqlRecordType(),
+ new BeamTextCSVTable(buildBeamSqlRowType(),
readerSourceFile.getAbsolutePath()).buildIOReader(pipeline)
- .apply(new BeamTextCSVTable(buildBeamSqlRecordType(), writerTargetFile.getAbsolutePath())
+ .apply(new BeamTextCSVTable(buildBeamSqlRowType(), writerTargetFile.getAbsolutePath())
.buildIOWriter());
pipeline.run();
- PCollection<BeamSqlRow> rows = new BeamTextCSVTable(buildBeamSqlRecordType(),
+ PCollection<BeamSqlRow> rows = new BeamTextCSVTable(buildBeamSqlRowType(),
writerTargetFile.getAbsolutePath()).buildIOReader(pipeline2);
// confirm the two reads match
@@ -167,11 +166,11 @@ public class BeamTextCSVTableTest {
.add("amount", SqlTypeName.DOUBLE).add("user_name", SqlTypeName.VARCHAR).build();
}
- private static BeamSqlRecordType buildBeamSqlRecordType() {
- return CalciteUtils.toBeamRecordType(buildRelDataType());
+ private static BeamSqlRowType buildBeamSqlRowType() {
+ return CalciteUtils.toBeamRowType(buildRelDataType());
}
private static BeamSqlRow buildRow(Object[] data) {
- return new BeamSqlRow(buildBeamSqlRecordType(), Arrays.asList(data));
+ return new BeamSqlRow(buildBeamSqlRowType(), Arrays.asList(data));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java
index a0fed22..5d5d4fc 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java
@@ -21,11 +21,10 @@ import java.text.ParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-
import org.apache.beam.dsls.sql.planner.BeamQueryPlanner;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
import org.apache.beam.dsls.sql.schema.BeamSqlRow;
import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
import org.apache.beam.dsls.sql.transform.BeamAggregationTransforms;
import org.apache.beam.dsls.sql.utils.CalciteUtils;
import org.apache.beam.sdk.coders.IterableCoder;
@@ -64,9 +63,9 @@ public class BeamAggregationTransformTest extends BeamTransformBaseTest{
private List<AggregateCall> aggCalls;
- private BeamSqlRecordType keyType;
- private BeamSqlRecordType aggPartType;
- private BeamSqlRecordType outputType;
+ private BeamSqlRowType keyType;
+ private BeamSqlRowType aggPartType;
+ private BeamSqlRowType outputType;
private BeamSqlRowCoder inRecordCoder;
private BeamSqlRowCoder keyCoder;
@@ -405,7 +404,7 @@ public class BeamAggregationTransformTest extends BeamTransformBaseTest{
/**
* Row type of final output row.
*/
- private BeamSqlRecordType prepareFinalRowType() {
+ private BeamSqlRowType prepareFinalRowType() {
FieldInfoBuilder builder = BeamQueryPlanner.TYPE_FACTORY.builder();
List<KV<String, SqlTypeName>> columnMetadata =
Arrays.asList(KV.of("f_int", SqlTypeName.INTEGER), KV.of("count", SqlTypeName.BIGINT),
@@ -433,7 +432,7 @@ public class BeamAggregationTransformTest extends BeamTransformBaseTest{
for (KV<String, SqlTypeName> cm : columnMetadata) {
builder.add(cm.getKey(), cm.getValue());
}
- return CalciteUtils.toBeamRecordType(builder.build());
+ return CalciteUtils.toBeamRowType(builder.build());
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamTransformBaseTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamTransformBaseTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamTransformBaseTest.java
index 2e91405..4045bc8 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamTransformBaseTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamTransformBaseTest.java
@@ -23,8 +23,8 @@ import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.dsls.sql.planner.BeamQueryPlanner;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
import org.apache.beam.dsls.sql.utils.CalciteUtils;
import org.apache.beam.sdk.values.KV;
import org.apache.calcite.rel.type.RelDataTypeFactory.FieldInfoBuilder;
@@ -38,7 +38,7 @@ import org.junit.BeforeClass;
public class BeamTransformBaseTest {
public static DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- public static BeamSqlRecordType inputRowType;
+ public static BeamSqlRowType inputRowType;
public static List<BeamSqlRow> inputRows;
@BeforeClass
@@ -66,14 +66,14 @@ public class BeamTransformBaseTest {
}
/**
- * create a {@code BeamSqlRecordType} for given column metadata.
+ * create a {@code BeamSqlRowType} for given column metadata.
*/
- public static BeamSqlRecordType initTypeOfSqlRow(List<KV<String, SqlTypeName>> columnMetadata){
+ public static BeamSqlRowType initTypeOfSqlRow(List<KV<String, SqlTypeName>> columnMetadata){
FieldInfoBuilder builder = BeamQueryPlanner.TYPE_FACTORY.builder();
for (KV<String, SqlTypeName> cm : columnMetadata) {
builder.add(cm.getKey(), cm.getValue());
}
- return CalciteUtils.toBeamRecordType(builder.build());
+ return CalciteUtils.toBeamRowType(builder.build());
}
/**
@@ -89,7 +89,7 @@ public class BeamTransformBaseTest {
*/
public static BeamSqlRow initBeamSqlRow(List<KV<String, SqlTypeName>> columnMetadata,
List<Object> rowValues){
- BeamSqlRecordType rowType = initTypeOfSqlRow(columnMetadata);
+ BeamSqlRowType rowType = initTypeOfSqlRow(columnMetadata);
return new BeamSqlRow(rowType, rowValues);
}
[2/3] beam git commit: [BEAM-2621] BeamSqlRecordType -> BeamSqlRowType
Posted by lz...@apache.org.
[BEAM-2621] BeamSqlRecordType -> BeamSqlRowType
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a9c8a8a1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a9c8a8a1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a9c8a8a1
Branch: refs/heads/DSL_SQL
Commit: a9c8a8a1e1f21d31c973e12b3c05c118e21fa43b
Parents: a1f7cf6
Author: James Xu <xu...@gmail.com>
Authored: Mon Jul 17 17:55:56 2017 +0800
Committer: JingsongLi <lz...@aliyun.com>
Committed: Wed Jul 19 10:05:17 2017 +0800
----------------------------------------------------------------------
.../org/apache/beam/dsls/sql/BeamSqlEnv.java | 12 +++---
.../beam/dsls/sql/example/BeamSqlExample.java | 4 +-
.../interpreter/BeamSqlExpressionExecutor.java | 2 +-
.../dsls/sql/interpreter/BeamSqlFnExecutor.java | 4 +-
.../operator/BeamSqlCaseExpression.java | 8 ++--
.../operator/BeamSqlCastExpression.java | 22 +++++------
.../operator/BeamSqlCompareExpression.java | 6 +--
.../interpreter/operator/BeamSqlExpression.java | 2 +-
.../operator/BeamSqlInputRefExpression.java | 4 +-
.../operator/BeamSqlIsNotNullExpression.java | 4 +-
.../operator/BeamSqlIsNullExpression.java | 4 +-
.../interpreter/operator/BeamSqlPrimitive.java | 2 +-
.../operator/BeamSqlReinterpretExpression.java | 6 +--
.../operator/BeamSqlUdfExpression.java | 4 +-
.../operator/BeamSqlWindowEndExpression.java | 4 +-
.../operator/BeamSqlWindowExpression.java | 4 +-
.../operator/BeamSqlWindowStartExpression.java | 4 +-
.../arithmetic/BeamSqlArithmeticExpression.java | 6 +--
.../date/BeamSqlCurrentDateExpression.java | 2 +-
.../date/BeamSqlCurrentTimeExpression.java | 2 +-
.../date/BeamSqlCurrentTimestampExpression.java | 2 +-
.../date/BeamSqlDateCeilExpression.java | 4 +-
.../date/BeamSqlDateFloorExpression.java | 4 +-
.../operator/date/BeamSqlExtractExpression.java | 4 +-
.../operator/logical/BeamSqlAndExpression.java | 4 +-
.../operator/logical/BeamSqlNotExpression.java | 4 +-
.../operator/logical/BeamSqlOrExpression.java | 4 +-
.../math/BeamSqlMathBinaryExpression.java | 4 +-
.../math/BeamSqlMathUnaryExpression.java | 4 +-
.../operator/math/BeamSqlPiExpression.java | 2 +-
.../string/BeamSqlCharLengthExpression.java | 4 +-
.../string/BeamSqlConcatExpression.java | 6 +--
.../string/BeamSqlInitCapExpression.java | 4 +-
.../operator/string/BeamSqlLowerExpression.java | 4 +-
.../string/BeamSqlOverlayExpression.java | 10 ++---
.../string/BeamSqlPositionExpression.java | 8 ++--
.../string/BeamSqlSubstringExpression.java | 8 ++--
.../operator/string/BeamSqlTrimExpression.java | 10 ++---
.../operator/string/BeamSqlUpperExpression.java | 4 +-
.../beam/dsls/sql/rel/BeamAggregationRel.java | 22 +++++------
.../apache/beam/dsls/sql/rel/BeamFilterRel.java | 2 +-
.../beam/dsls/sql/rel/BeamIOSourceRel.java | 2 +-
.../apache/beam/dsls/sql/rel/BeamJoinRel.java | 12 +++---
.../beam/dsls/sql/rel/BeamProjectRel.java | 4 +-
.../apache/beam/dsls/sql/rel/BeamSortRel.java | 2 +-
.../apache/beam/dsls/sql/rel/BeamValuesRel.java | 9 ++---
.../beam/dsls/sql/schema/BaseBeamTable.java | 10 ++---
.../dsls/sql/schema/BeamPCollectionTable.java | 8 ++--
.../beam/dsls/sql/schema/BeamSqlRecordType.java | 40 --------------------
.../apache/beam/dsls/sql/schema/BeamSqlRow.java | 10 ++---
.../beam/dsls/sql/schema/BeamSqlRowCoder.java | 6 +--
.../beam/dsls/sql/schema/BeamSqlRowType.java | 40 ++++++++++++++++++++
.../beam/dsls/sql/schema/BeamSqlTable.java | 2 +-
.../beam/dsls/sql/schema/BeamTableUtils.java | 10 ++---
.../sql/schema/kafka/BeamKafkaCSVTable.java | 29 +++++++-------
.../dsls/sql/schema/kafka/BeamKafkaTable.java | 11 +++---
.../dsls/sql/schema/text/BeamTextCSVTable.java | 14 +++----
.../schema/text/BeamTextCSVTableIOReader.java | 11 +++---
.../schema/text/BeamTextCSVTableIOWriter.java | 9 ++---
.../dsls/sql/schema/text/BeamTextTable.java | 6 +--
.../transform/BeamAggregationTransforms.java | 26 ++++++-------
.../dsls/sql/transform/BeamJoinTransforms.java | 6 +--
.../dsls/sql/transform/BeamSqlProjectFn.java | 16 ++++----
.../beam/dsls/sql/utils/CalciteUtils.java | 12 +++---
.../dsls/sql/BeamSqlDslAggregationTest.java | 14 +++----
.../apache/beam/dsls/sql/BeamSqlDslBase.java | 26 ++++++-------
.../beam/dsls/sql/BeamSqlDslJoinTest.java | 10 ++---
.../beam/dsls/sql/BeamSqlDslProjectTest.java | 10 ++---
.../beam/dsls/sql/BeamSqlDslUdfUdafTest.java | 6 +--
.../org/apache/beam/dsls/sql/TestUtils.java | 28 +++++++-------
...mSqlBuiltinFunctionsIntegrationTestBase.java | 6 +--
.../interpreter/BeamSqlFnExecutorTestBase.java | 8 ++--
.../beam/dsls/sql/mock/MockedBoundedTable.java | 14 +++----
.../apache/beam/dsls/sql/mock/MockedTable.java | 6 +--
.../dsls/sql/mock/MockedUnboundedTable.java | 14 +++----
.../dsls/sql/schema/BeamSqlRowCoderTest.java | 6 +--
.../sql/schema/kafka/BeamKafkaCSVTableTest.java | 7 ++--
.../sql/schema/text/BeamTextCSVTableTest.java | 17 ++++-----
.../transform/BeamAggregationTransformTest.java | 13 +++----
.../schema/transform/BeamTransformBaseTest.java | 12 +++---
80 files changed, 354 insertions(+), 362 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java
index e8c8c97..0e1ac98 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java
@@ -21,7 +21,7 @@ import java.io.Serializable;
import org.apache.beam.dsls.sql.planner.BeamQueryPlanner;
import org.apache.beam.dsls.sql.schema.BaseBeamTable;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
import org.apache.beam.dsls.sql.schema.BeamSqlUdaf;
import org.apache.beam.dsls.sql.schema.BeamSqlUdf;
import org.apache.beam.dsls.sql.utils.CalciteUtils;
@@ -73,7 +73,7 @@ public class BeamSqlEnv implements Serializable{
*
*/
public void registerTable(String tableName, BaseBeamTable table) {
- schema.add(tableName, new BeamCalciteTable(table.getRecordType()));
+ schema.add(tableName, new BeamCalciteTable(table.getRowType()));
planner.getSourceTables().put(tableName, table);
}
@@ -85,13 +85,13 @@ public class BeamSqlEnv implements Serializable{
}
private static class BeamCalciteTable implements ScannableTable, Serializable {
- private BeamSqlRecordType beamSqlRecordType;
- public BeamCalciteTable(BeamSqlRecordType beamSqlRecordType) {
- this.beamSqlRecordType = beamSqlRecordType;
+ private BeamSqlRowType beamSqlRowType;
+ public BeamCalciteTable(BeamSqlRowType beamSqlRowType) {
+ this.beamSqlRowType = beamSqlRowType;
}
@Override
public RelDataType getRowType(RelDataTypeFactory typeFactory) {
- return CalciteUtils.toCalciteRecordType(this.beamSqlRecordType)
+ return CalciteUtils.toCalciteRowType(this.beamSqlRowType)
.apply(BeamQueryPlanner.TYPE_FACTORY);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
index 04fe451..91df2be 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
@@ -21,9 +21,9 @@ import java.sql.Types;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.dsls.sql.BeamSql;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
import org.apache.beam.dsls.sql.schema.BeamSqlRow;
import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -51,7 +51,7 @@ class BeamSqlExample {
//define the input row format
List<String> fieldNames = Arrays.asList("c1", "c2", "c3");
List<Integer> fieldTypes = Arrays.asList(Types.INTEGER, Types.VARCHAR, Types.DOUBLE);
- BeamSqlRecordType type = BeamSqlRecordType.create(fieldNames, fieldTypes);
+ BeamSqlRowType type = BeamSqlRowType.create(fieldNames, fieldTypes);
BeamSqlRow row = new BeamSqlRow(type);
row.addField(0, 1);
row.addField(1, "row");
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlExpressionExecutor.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlExpressionExecutor.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlExpressionExecutor.java
index a314bf4..3732933 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlExpressionExecutor.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlExpressionExecutor.java
@@ -37,7 +37,7 @@ public interface BeamSqlExpressionExecutor extends Serializable {
* apply transformation to input record {@link BeamSqlRow}.
*
*/
- List<Object> execute(BeamSqlRow inputRecord);
+ List<Object> execute(BeamSqlRow inputRow);
void close();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java
index 64bc880..0be918d 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java
@@ -427,10 +427,10 @@ public class BeamSqlFnExecutor implements BeamSqlExpressionExecutor {
}
@Override
- public List<Object> execute(BeamSqlRow inputRecord) {
+ public List<Object> execute(BeamSqlRow inputRow) {
List<Object> results = new ArrayList<>();
for (BeamSqlExpression exp : exps) {
- results.add(exp.evaluate(inputRecord).getValue());
+ results.add(exp.evaluate(inputRow).getValue());
}
return results;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCaseExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCaseExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCaseExpression.java
index a15c42e..a30916b 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCaseExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCaseExpression.java
@@ -49,16 +49,16 @@ public class BeamSqlCaseExpression extends BeamSqlExpression {
return true;
}
- @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
+ @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
for (int i = 0; i < operands.size() - 1; i += 2) {
- if (opValueEvaluated(i, inputRecord)) {
+ if (opValueEvaluated(i, inputRow)) {
return BeamSqlPrimitive.of(
outputType,
- opValueEvaluated(i + 1, inputRecord)
+ opValueEvaluated(i + 1, inputRow)
);
}
}
return BeamSqlPrimitive.of(outputType,
- opValueEvaluated(operands.size() - 1, inputRecord));
+ opValueEvaluated(operands.size() - 1, inputRow));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpression.java
index 7e8ab03..524d1df 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpression.java
@@ -72,40 +72,40 @@ public class BeamSqlCastExpression extends BeamSqlExpression {
}
@Override
- public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
+ public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
SqlTypeName castOutputType = getOutputType();
switch (castOutputType) {
case INTEGER:
return BeamSqlPrimitive
- .of(SqlTypeName.INTEGER, SqlFunctions.toInt(opValueEvaluated(index, inputRecord)));
+ .of(SqlTypeName.INTEGER, SqlFunctions.toInt(opValueEvaluated(index, inputRow)));
case DOUBLE:
return BeamSqlPrimitive
- .of(SqlTypeName.DOUBLE, SqlFunctions.toDouble(opValueEvaluated(index, inputRecord)));
+ .of(SqlTypeName.DOUBLE, SqlFunctions.toDouble(opValueEvaluated(index, inputRow)));
case SMALLINT:
return BeamSqlPrimitive
- .of(SqlTypeName.SMALLINT, SqlFunctions.toShort(opValueEvaluated(index, inputRecord)));
+ .of(SqlTypeName.SMALLINT, SqlFunctions.toShort(opValueEvaluated(index, inputRow)));
case TINYINT:
return BeamSqlPrimitive
- .of(SqlTypeName.TINYINT, SqlFunctions.toByte(opValueEvaluated(index, inputRecord)));
+ .of(SqlTypeName.TINYINT, SqlFunctions.toByte(opValueEvaluated(index, inputRow)));
case BIGINT:
return BeamSqlPrimitive
- .of(SqlTypeName.BIGINT, SqlFunctions.toLong(opValueEvaluated(index, inputRecord)));
+ .of(SqlTypeName.BIGINT, SqlFunctions.toLong(opValueEvaluated(index, inputRow)));
case DECIMAL:
return BeamSqlPrimitive.of(SqlTypeName.DECIMAL,
- SqlFunctions.toBigDecimal(opValueEvaluated(index, inputRecord)));
+ SqlFunctions.toBigDecimal(opValueEvaluated(index, inputRow)));
case FLOAT:
return BeamSqlPrimitive
- .of(SqlTypeName.FLOAT, SqlFunctions.toFloat(opValueEvaluated(index, inputRecord)));
+ .of(SqlTypeName.FLOAT, SqlFunctions.toFloat(opValueEvaluated(index, inputRow)));
case CHAR:
case VARCHAR:
return BeamSqlPrimitive
- .of(SqlTypeName.VARCHAR, opValueEvaluated(index, inputRecord).toString());
+ .of(SqlTypeName.VARCHAR, opValueEvaluated(index, inputRow).toString());
case DATE:
return BeamSqlPrimitive
- .of(SqlTypeName.DATE, toDate(opValueEvaluated(index, inputRecord), outputDateFormat));
+ .of(SqlTypeName.DATE, toDate(opValueEvaluated(index, inputRow), outputDateFormat));
case TIMESTAMP:
return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
- toTimeStamp(opValueEvaluated(index, inputRecord), outputTimestampFormat));
+ toTimeStamp(opValueEvaluated(index, inputRow), outputTimestampFormat));
}
throw new UnsupportedOperationException(
String.format("Cast to type %s not supported", castOutputType));
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpression.java
index 3d96616..5076ccc 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpression.java
@@ -48,9 +48,9 @@ public abstract class BeamSqlCompareExpression extends BeamSqlExpression {
}
@Override
- public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRecord) {
- Object leftValue = operands.get(0).evaluate(inputRecord).getValue();
- Object rightValue = operands.get(1).evaluate(inputRecord).getValue();
+ public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRow) {
+ Object leftValue = operands.get(0).evaluate(inputRow).getValue();
+ Object rightValue = operands.get(1).evaluate(inputRow).getValue();
switch (operands.get(0).outputType) {
case BIGINT:
case DECIMAL:
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java
index 33feb3e..9d2815c 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java
@@ -62,7 +62,7 @@ public abstract class BeamSqlExpression implements Serializable {
* Apply input record {@link BeamSqlRow} to this expression,
* the output value is wrapped with {@link BeamSqlPrimitive}.
*/
- public abstract BeamSqlPrimitive evaluate(BeamSqlRow inputRecord);
+ public abstract BeamSqlPrimitive evaluate(BeamSqlRow inputRow);
public List<BeamSqlExpression> getOperands() {
return operands;
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpression.java
index b6d2b0b..710460b 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpression.java
@@ -37,7 +37,7 @@ public class BeamSqlInputRefExpression extends BeamSqlExpression {
}
@Override
- public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
- return BeamSqlPrimitive.of(outputType, inputRecord.getFieldValue(inputRef));
+ public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ return BeamSqlPrimitive.of(outputType, inputRow.getFieldValue(inputRef));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNotNullExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNotNullExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNotNullExpression.java
index e08e737..23d9c83 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNotNullExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNotNullExpression.java
@@ -44,8 +44,8 @@ public class BeamSqlIsNotNullExpression extends BeamSqlExpression {
}
@Override
- public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRecord) {
- Object leftValue = operands.get(0).evaluate(inputRecord).getValue();
+ public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRow) {
+ Object leftValue = operands.get(0).evaluate(inputRow).getValue();
return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, leftValue != null);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNullExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNullExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNullExpression.java
index d4e070d..4d3fd45 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNullExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNullExpression.java
@@ -44,8 +44,8 @@ public class BeamSqlIsNullExpression extends BeamSqlExpression {
}
@Override
- public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRecord) {
- Object leftValue = operands.get(0).evaluate(inputRecord).getValue();
+ public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRow) {
+ Object leftValue = operands.get(0).evaluate(inputRow).getValue();
return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, leftValue == null);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
index c5c80b9..51724bb 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
@@ -145,7 +145,7 @@ public class BeamSqlPrimitive<T> extends BeamSqlExpression {
}
@Override
- public BeamSqlPrimitive<T> evaluate(BeamSqlRow inputRecord) {
+ public BeamSqlPrimitive<T> evaluate(BeamSqlRow inputRow) {
return this;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlReinterpretExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlReinterpretExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlReinterpretExpression.java
index 783466c..efdb2df 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlReinterpretExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlReinterpretExpression.java
@@ -42,13 +42,13 @@ public class BeamSqlReinterpretExpression extends BeamSqlExpression {
&& SqlTypeName.DATETIME_TYPES.contains(opType(0));
}
- @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
+ @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
if (opType(0) == SqlTypeName.TIME) {
- GregorianCalendar date = opValueEvaluated(0, inputRecord);
+ GregorianCalendar date = opValueEvaluated(0, inputRow);
return BeamSqlPrimitive.of(outputType, date.getTimeInMillis());
} else {
- Date date = opValueEvaluated(0, inputRecord);
+ Date date = opValueEvaluated(0, inputRow);
return BeamSqlPrimitive.of(outputType, date.getTime());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpression.java
index 6f18307..e389ef9 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpression.java
@@ -51,14 +51,14 @@ public class BeamSqlUdfExpression extends BeamSqlExpression {
}
@Override
- public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
+ public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
if (method == null) {
reConstructMethod();
}
try {
List<Object> paras = new ArrayList<>();
for (BeamSqlExpression e : getOperands()) {
- paras.add(e.evaluate(inputRecord).getValue());
+ paras.add(e.evaluate(inputRow).getValue());
}
return BeamSqlPrimitive.of(getOutputType(),
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowEndExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowEndExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowEndExpression.java
index 8bc090f..ecc6939 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowEndExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowEndExpression.java
@@ -34,9 +34,9 @@ public class BeamSqlWindowEndExpression extends BeamSqlExpression {
}
@Override
- public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRecord) {
+ public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRow) {
return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
- new Date(inputRecord.getWindowEnd().getMillis()));
+ new Date(inputRow.getWindowEnd().getMillis()));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowExpression.java
index eb4c03b..71f0672 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowExpression.java
@@ -42,9 +42,9 @@ public class BeamSqlWindowExpression extends BeamSqlExpression {
}
@Override
- public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRecord) {
+ public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRow) {
return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
- (Date) operands.get(0).evaluate(inputRecord).getValue());
+ (Date) operands.get(0).evaluate(inputRow).getValue());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowStartExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowStartExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowStartExpression.java
index 1e2c0a2..f3aba2e 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowStartExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowStartExpression.java
@@ -35,9 +35,9 @@ public class BeamSqlWindowStartExpression extends BeamSqlExpression {
}
@Override
- public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRecord) {
+ public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRow) {
return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
- new Date(inputRecord.getWindowStart().getMillis()));
+ new Date(inputRow.getWindowStart().getMillis()));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
index eac4c72..d62123c 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
@@ -50,11 +50,11 @@ public abstract class BeamSqlArithmeticExpression extends BeamSqlExpression {
super(operands, outputType);
}
- @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamSqlRow inputRecord) {
+ @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamSqlRow inputRow) {
BigDecimal left = BigDecimal.valueOf(
- Double.valueOf(opValueEvaluated(0, inputRecord).toString()));
+ Double.valueOf(opValueEvaluated(0, inputRow).toString()));
BigDecimal right = BigDecimal.valueOf(
- Double.valueOf(opValueEvaluated(1, inputRecord).toString()));
+ Double.valueOf(opValueEvaluated(1, inputRow).toString()));
BigDecimal result = calc(left, right);
return getCorrectlyTypedResult(result);
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentDateExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentDateExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentDateExpression.java
index 2f83140..c7df5ab 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentDateExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentDateExpression.java
@@ -39,7 +39,7 @@ public class BeamSqlCurrentDateExpression extends BeamSqlExpression {
return getOperands().size() == 0;
}
- @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
+ @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
return BeamSqlPrimitive.of(outputType, new Date());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimeExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimeExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimeExpression.java
index c15123a..46e5a43 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimeExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimeExpression.java
@@ -45,7 +45,7 @@ public class BeamSqlCurrentTimeExpression extends BeamSqlExpression {
return opCount <= 1;
}
- @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
+ @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
GregorianCalendar ret = new GregorianCalendar(TimeZone.getDefault());
ret.setTime(new Date());
return BeamSqlPrimitive.of(outputType, ret);
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java
index 0ea12f1..303846d 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java
@@ -43,7 +43,7 @@ public class BeamSqlCurrentTimestampExpression extends BeamSqlExpression {
return opCount <= 1;
}
- @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
+ @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
return BeamSqlPrimitive.of(outputType, new Date());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateCeilExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateCeilExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateCeilExpression.java
index 68f1aa9..59e3e9c 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateCeilExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateCeilExpression.java
@@ -42,8 +42,8 @@ public class BeamSqlDateCeilExpression extends BeamSqlExpression {
&& opType(1) == SqlTypeName.SYMBOL;
}
- @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
- Date date = opValueEvaluated(0, inputRecord);
+ @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ Date date = opValueEvaluated(0, inputRow);
long time = date.getTime();
TimeUnitRange unit = ((BeamSqlPrimitive<TimeUnitRange>) op(1)).getValue();
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateFloorExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateFloorExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateFloorExpression.java
index 4d446e3..64234f5 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateFloorExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateFloorExpression.java
@@ -42,8 +42,8 @@ public class BeamSqlDateFloorExpression extends BeamSqlExpression {
&& opType(1) == SqlTypeName.SYMBOL;
}
- @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
- Date date = opValueEvaluated(0, inputRecord);
+ @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ Date date = opValueEvaluated(0, inputRow);
long time = date.getTime();
TimeUnitRange unit = ((BeamSqlPrimitive<TimeUnitRange>) op(1)).getValue();
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlExtractExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlExtractExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlExtractExpression.java
index bc8ed0f..d41a249 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlExtractExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlExtractExpression.java
@@ -61,8 +61,8 @@ public class BeamSqlExtractExpression extends BeamSqlExpression {
&& opType(1) == SqlTypeName.BIGINT;
}
- @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
- Long time = opValueEvaluated(1, inputRecord);
+ @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ Long time = opValueEvaluated(1, inputRow);
TimeUnitRange unit = ((BeamSqlPrimitive<TimeUnitRange>) op(0)).getValue();
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlAndExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlAndExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlAndExpression.java
index 5da43f4..5f6abe0 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlAndExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlAndExpression.java
@@ -33,10 +33,10 @@ public class BeamSqlAndExpression extends BeamSqlLogicalExpression {
}
@Override
- public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRecord) {
+ public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRow) {
boolean result = true;
for (BeamSqlExpression exp : operands) {
- BeamSqlPrimitive<Boolean> expOut = exp.evaluate(inputRecord);
+ BeamSqlPrimitive<Boolean> expOut = exp.evaluate(inputRow);
result = result && expOut.getValue();
if (!result) {
break;
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlNotExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlNotExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlNotExpression.java
index ffa0184..6df52aa 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlNotExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlNotExpression.java
@@ -43,8 +43,8 @@ public class BeamSqlNotExpression extends BeamSqlLogicalExpression {
return super.accept();
}
- @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
- Boolean value = opValueEvaluated(0, inputRecord);
+ @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ Boolean value = opValueEvaluated(0, inputRow);
if (value == null) {
return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, null);
} else {
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlOrExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlOrExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlOrExpression.java
index 9ca57f0..450638c 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlOrExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlOrExpression.java
@@ -33,10 +33,10 @@ public class BeamSqlOrExpression extends BeamSqlLogicalExpression {
}
@Override
- public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRecord) {
+ public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRow) {
boolean result = false;
for (BeamSqlExpression exp : operands) {
- BeamSqlPrimitive<Boolean> expOut = exp.evaluate(inputRecord);
+ BeamSqlPrimitive<Boolean> expOut = exp.evaluate(inputRow);
result = result || expOut.getValue();
if (result) {
break;
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathBinaryExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathBinaryExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathBinaryExpression.java
index f79bcf6..2d444f8 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathBinaryExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathBinaryExpression.java
@@ -39,10 +39,10 @@ public abstract class BeamSqlMathBinaryExpression extends BeamSqlExpression {
return numberOfOperands() == 2 && isOperandNumeric(opType(0)) && isOperandNumeric(opType(1));
}
- @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamSqlRow inputRecord) {
+ @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamSqlRow inputRow) {
BeamSqlExpression leftOp = op(0);
BeamSqlExpression rightOp = op(1);
- return calculate(leftOp.evaluate(inputRecord), rightOp.evaluate(inputRecord));
+ return calculate(leftOp.evaluate(inputRow), rightOp.evaluate(inputRow));
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathUnaryExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathUnaryExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathUnaryExpression.java
index a65333c..4733d09 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathUnaryExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathUnaryExpression.java
@@ -45,9 +45,9 @@ public abstract class BeamSqlMathUnaryExpression extends BeamSqlExpression {
return acceptance;
}
- @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamSqlRow inputRecord) {
+ @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamSqlRow inputRow) {
BeamSqlExpression operand = op(0);
- return calculate(operand.evaluate(inputRecord));
+ return calculate(operand.evaluate(inputRow));
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlPiExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlPiExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlPiExpression.java
index 4645951..9db810e 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlPiExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlPiExpression.java
@@ -36,7 +36,7 @@ public class BeamSqlPiExpression extends BeamSqlExpression {
return numberOfOperands() == 0;
}
- @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
+ @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
return BeamSqlPrimitive.of(SqlTypeName.DOUBLE, Math.PI);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlCharLengthExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlCharLengthExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlCharLengthExpression.java
index 3ed9b80..7c61061 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlCharLengthExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlCharLengthExpression.java
@@ -33,8 +33,8 @@ public class BeamSqlCharLengthExpression extends BeamSqlStringUnaryExpression {
super(operands, SqlTypeName.INTEGER);
}
- @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
- String str = opValueEvaluated(0, inputRecord);
+ @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ String str = opValueEvaluated(0, inputRow);
return BeamSqlPrimitive.of(SqlTypeName.INTEGER, str.length());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlConcatExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlConcatExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlConcatExpression.java
index e8e4e50..93e1f71 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlConcatExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlConcatExpression.java
@@ -52,9 +52,9 @@ public class BeamSqlConcatExpression extends BeamSqlExpression {
return true;
}
- @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
- String left = opValueEvaluated(0, inputRecord);
- String right = opValueEvaluated(1, inputRecord);
+ @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ String left = opValueEvaluated(0, inputRow);
+ String right = opValueEvaluated(1, inputRow);
return BeamSqlPrimitive.of(SqlTypeName.VARCHAR,
new StringBuilder(left.length() + right.length())
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlInitCapExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlInitCapExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlInitCapExpression.java
index 51dfe28..7726e27 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlInitCapExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlInitCapExpression.java
@@ -33,8 +33,8 @@ public class BeamSqlInitCapExpression extends BeamSqlStringUnaryExpression {
super(operands, SqlTypeName.VARCHAR);
}
- @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
- String str = opValueEvaluated(0, inputRecord);
+ @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ String str = opValueEvaluated(0, inputRow);
StringBuilder ret = new StringBuilder(str);
boolean isInit = true;
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlLowerExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlLowerExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlLowerExpression.java
index f70fb1a..cb198ec 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlLowerExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlLowerExpression.java
@@ -33,8 +33,8 @@ public class BeamSqlLowerExpression extends BeamSqlStringUnaryExpression {
super(operands, SqlTypeName.VARCHAR);
}
- @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
- String str = opValueEvaluated(0, inputRecord);
+ @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ String str = opValueEvaluated(0, inputRow);
return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.toLowerCase());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlOverlayExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlOverlayExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlOverlayExpression.java
index 20d9962..cb6a523 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlOverlayExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlOverlayExpression.java
@@ -55,15 +55,15 @@ public class BeamSqlOverlayExpression extends BeamSqlExpression {
return true;
}
- @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
- String str = opValueEvaluated(0, inputRecord);
- String replaceStr = opValueEvaluated(1, inputRecord);
- int idx = opValueEvaluated(2, inputRecord);
+ @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ String str = opValueEvaluated(0, inputRow);
+ String replaceStr = opValueEvaluated(1, inputRow);
+ int idx = opValueEvaluated(2, inputRow);
// the index is 1 based.
idx -= 1;
int length = replaceStr.length();
if (operands.size() == 4) {
- length = opValueEvaluated(3, inputRecord);
+ length = opValueEvaluated(3, inputRow);
}
StringBuilder result = new StringBuilder(
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlPositionExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlPositionExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlPositionExpression.java
index 1d09b51..144acbf 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlPositionExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlPositionExpression.java
@@ -57,12 +57,12 @@ public class BeamSqlPositionExpression extends BeamSqlExpression {
return true;
}
- @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
- String targetStr = opValueEvaluated(0, inputRecord);
- String containingStr = opValueEvaluated(1, inputRecord);
+ @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ String targetStr = opValueEvaluated(0, inputRow);
+ String containingStr = opValueEvaluated(1, inputRow);
int from = -1;
if (operands.size() == 3) {
- Number tmp = opValueEvaluated(2, inputRecord);
+ Number tmp = opValueEvaluated(2, inputRow);
from = tmp.intValue();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlSubstringExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlSubstringExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlSubstringExpression.java
index d9bbc98..8b33125 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlSubstringExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlSubstringExpression.java
@@ -55,9 +55,9 @@ public class BeamSqlSubstringExpression extends BeamSqlExpression {
return true;
}
- @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
- String str = opValueEvaluated(0, inputRecord);
- int idx = opValueEvaluated(1, inputRecord);
+ @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ String str = opValueEvaluated(0, inputRow);
+ int idx = opValueEvaluated(1, inputRow);
int startIdx = idx;
if (startIdx > 0) {
// NOTE: SQL substring is 1 based(rather than 0 based)
@@ -70,7 +70,7 @@ public class BeamSqlSubstringExpression extends BeamSqlExpression {
}
if (operands.size() == 3) {
- int length = opValueEvaluated(2, inputRecord);
+ int length = opValueEvaluated(2, inputRow);
if (length < 0) {
length = 0;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlTrimExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlTrimExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlTrimExpression.java
index ac4d060..5e6c2bb 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlTrimExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlTrimExpression.java
@@ -58,14 +58,14 @@ public class BeamSqlTrimExpression extends BeamSqlExpression {
return true;
}
- @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
+ @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
if (operands.size() == 1) {
return BeamSqlPrimitive.of(SqlTypeName.VARCHAR,
- opValueEvaluated(0, inputRecord).toString().trim());
+ opValueEvaluated(0, inputRow).toString().trim());
} else {
- SqlTrimFunction.Flag type = opValueEvaluated(0, inputRecord);
- String targetStr = opValueEvaluated(1, inputRecord);
- String containingStr = opValueEvaluated(2, inputRecord);
+ SqlTrimFunction.Flag type = opValueEvaluated(0, inputRow);
+ String targetStr = opValueEvaluated(1, inputRow);
+ String containingStr = opValueEvaluated(2, inputRow);
switch (type) {
case LEADING:
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpression.java
index 8fcaca4..efa9c95 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpression.java
@@ -33,8 +33,8 @@ public class BeamSqlUpperExpression extends BeamSqlStringUnaryExpression {
super(operands, SqlTypeName.VARCHAR);
}
- @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
- String str = opValueEvaluated(0, inputRecord);
+ @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ String str = opValueEvaluated(0, inputRow);
return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.toUpperCase());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
index 5389ec7..9dcb079 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
@@ -20,9 +20,9 @@ package org.apache.beam.dsls.sql.rel;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.dsls.sql.BeamSqlEnv;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
import org.apache.beam.dsls.sql.schema.BeamSqlRow;
import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
import org.apache.beam.dsls.sql.transform.BeamAggregationTransforms;
import org.apache.beam.dsls.sql.utils.CalciteUtils;
import org.apache.beam.sdk.coders.KvCoder;
@@ -105,13 +105,13 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode {
stageName + "combineBy",
Combine.<BeamSqlRow, BeamSqlRow, BeamSqlRow>perKey(
new BeamAggregationTransforms.AggregationAdaptor(getAggCallList(),
- CalciteUtils.toBeamRecordType(input.getRowType()))))
+ CalciteUtils.toBeamRowType(input.getRowType()))))
.setCoder(KvCoder.of(keyCoder, aggCoder));
PCollection<BeamSqlRow> mergedStream = aggregatedStream.apply(stageName + "mergeRecord",
ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord(
- CalciteUtils.toBeamRecordType(getRowType()), getAggCallList(), windowFieldIdx)));
- mergedStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRecordType(getRowType())));
+ CalciteUtils.toBeamRowType(getRowType()), getAggCallList(), windowFieldIdx)));
+ mergedStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
return mergedStream;
}
@@ -119,23 +119,23 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode {
/**
* Type of sub-rowrecord used as Group-By keys.
*/
- private BeamSqlRecordType exKeyFieldsSchema(RelDataType relDataType) {
- BeamSqlRecordType inputRecordType = CalciteUtils.toBeamRecordType(relDataType);
+ private BeamSqlRowType exKeyFieldsSchema(RelDataType relDataType) {
+ BeamSqlRowType inputRowType = CalciteUtils.toBeamRowType(relDataType);
List<String> fieldNames = new ArrayList<>();
List<Integer> fieldTypes = new ArrayList<>();
for (int i : groupSet.asList()) {
if (i != windowFieldIdx) {
- fieldNames.add(inputRecordType.getFieldsName().get(i));
- fieldTypes.add(inputRecordType.getFieldsType().get(i));
+ fieldNames.add(inputRowType.getFieldsName().get(i));
+ fieldTypes.add(inputRowType.getFieldsType().get(i));
}
}
- return BeamSqlRecordType.create(fieldNames, fieldTypes);
+ return BeamSqlRowType.create(fieldNames, fieldTypes);
}
/**
* Type of sub-rowrecord, that represents the list of aggregation fields.
*/
- private BeamSqlRecordType exAggFieldsSchema() {
+ private BeamSqlRowType exAggFieldsSchema() {
List<String> fieldNames = new ArrayList<>();
List<Integer> fieldTypes = new ArrayList<>();
for (AggregateCall ac : getAggCallList()) {
@@ -143,7 +143,7 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode {
fieldTypes.add(CalciteUtils.toJavaType(ac.type.getSqlTypeName()));
}
- return BeamSqlRecordType.create(fieldNames, fieldTypes);
+ return BeamSqlRowType.create(fieldNames, fieldTypes);
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java
index 07b5c7c..f802104 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java
@@ -62,7 +62,7 @@ public class BeamFilterRel extends Filter implements BeamRelNode {
PCollection<BeamSqlRow> filterStream = upstream.apply(stageName,
ParDo.of(new BeamSqlFilterFn(getRelTypeName(), executor)));
- filterStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRecordType(getRowType())));
+ filterStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
return filterStream;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
index b26d2b8..6754991 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
@@ -56,7 +56,7 @@ public class BeamIOSourceRel extends TableScan implements BeamRelNode {
//If not, the source PColection is provided with BaseBeamTable.buildIOReader().
BaseBeamTable sourceTable = sqlEnv.findTable(sourceName);
return sourceTable.buildIOReader(inputPCollections.getPipeline())
- .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRecordType(getRowType())));
+ .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java
index 3c92e42..3ebf152 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java
@@ -23,9 +23,9 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.beam.dsls.sql.BeamSqlEnv;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
import org.apache.beam.dsls.sql.schema.BeamSqlRow;
import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
import org.apache.beam.dsls.sql.transform.BeamJoinTransforms;
import org.apache.beam.dsls.sql.utils.CalciteUtils;
import org.apache.beam.sdk.coders.Coder;
@@ -97,7 +97,7 @@ public class BeamJoinRel extends Join implements BeamRelNode {
BeamSqlEnv sqlEnv)
throws Exception {
BeamRelNode leftRelNode = BeamSqlRelUtils.getBeamRelInput(left);
- BeamSqlRecordType leftRowType = CalciteUtils.toBeamRecordType(left.getRowType());
+ BeamSqlRowType leftRowType = CalciteUtils.toBeamRowType(left.getRowType());
PCollection<BeamSqlRow> leftRows = leftRelNode.buildBeamPipeline(inputPCollections, sqlEnv);
final BeamRelNode rightRelNode = BeamSqlRelUtils.getBeamRelInput(right);
@@ -119,7 +119,7 @@ public class BeamJoinRel extends Join implements BeamRelNode {
names.add("c" + i);
types.add(leftRowType.getFieldsType().get(pairs.get(i).getKey()));
}
- BeamSqlRecordType extractKeyRowType = BeamSqlRecordType.create(names, types);
+ BeamSqlRowType extractKeyRowType = BeamSqlRowType.create(names, types);
Coder extractKeyRowCoder = new BeamSqlRowCoder(extractKeyRowType);
@@ -213,7 +213,7 @@ public class BeamJoinRel extends Join implements BeamRelNode {
PCollection<BeamSqlRow> ret = joinedRows
.apply(stageName + "_JoinParts2WholeRow",
MapElements.via(new BeamJoinTransforms.JoinParts2WholeRow()))
- .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRecordType(getRowType())));
+ .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
return ret;
}
@@ -249,13 +249,13 @@ public class BeamJoinRel extends Join implements BeamRelNode {
PCollection<BeamSqlRow> ret = leftRows
.apply(ParDo.of(new BeamJoinTransforms.SideInputJoinDoFn(
joinType, rightNullRow, rowsView, swapped)).withSideInputs(rowsView))
- .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRecordType(getRowType())));
+ .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
return ret;
}
private BeamSqlRow buildNullRow(BeamRelNode relNode) {
- BeamSqlRecordType leftType = CalciteUtils.toBeamRecordType(relNode.getRowType());
+ BeamSqlRowType leftType = CalciteUtils.toBeamRowType(relNode.getRowType());
BeamSqlRow nullRow = new BeamSqlRow(leftType);
for (int i = 0; i < leftType.size(); i++) {
nullRow.addField(i, null);
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java
index 2cdfc72..8f8e5ce 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java
@@ -72,8 +72,8 @@ public class BeamProjectRel extends Project implements BeamRelNode {
PCollection<BeamSqlRow> projectStream = upstream.apply(stageName, ParDo
.of(new BeamSqlProjectFn(getRelTypeName(), executor,
- CalciteUtils.toBeamRecordType(rowType))));
- projectStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRecordType(getRowType())));
+ CalciteUtils.toBeamRowType(rowType))));
+ projectStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
return projectStream;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java
index 75f9717..ba344df 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java
@@ -149,7 +149,7 @@ public class BeamSortRel extends Sort implements BeamRelNode {
PCollection<BeamSqlRow> orderedStream = rawStream.apply(
"flatten", Flatten.<BeamSqlRow>iterables());
- orderedStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRecordType(getRowType())));
+ orderedStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
return orderedStream;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java
index 030d2c8..43b74c3 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java
@@ -19,13 +19,12 @@
package org.apache.beam.dsls.sql.rel;
import com.google.common.collect.ImmutableList;
-
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.dsls.sql.BeamSqlEnv;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
import org.apache.beam.dsls.sql.schema.BeamSqlRow;
import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
import org.apache.beam.dsls.sql.schema.BeamTableUtils;
import org.apache.beam.dsls.sql.utils.CalciteUtils;
import org.apache.beam.sdk.transforms.Create;
@@ -65,9 +64,9 @@ public class BeamValuesRel extends Values implements BeamRelNode {
throw new IllegalStateException("Values with empty tuples!");
}
- BeamSqlRecordType beamSQLRecordType = CalciteUtils.toBeamRecordType(this.getRowType());
+ BeamSqlRowType beamSQLRowType = CalciteUtils.toBeamRowType(this.getRowType());
for (ImmutableList<RexLiteral> tuple : tuples) {
- BeamSqlRow row = new BeamSqlRow(beamSQLRecordType);
+ BeamSqlRow row = new BeamSqlRow(beamSQLRowType);
for (int i = 0; i < tuple.size(); i++) {
BeamTableUtils.addFieldWithAutoTypeCasting(row, i, tuple.get(i).getValue());
}
@@ -75,6 +74,6 @@ public class BeamValuesRel extends Values implements BeamRelNode {
}
return inputPCollections.getPipeline().apply(stageName, Create.of(rows))
- .setCoder(new BeamSqlRowCoder(beamSQLRecordType));
+ .setCoder(new BeamSqlRowCoder(beamSQLRowType));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java
index 6d49bcc..dfa2785 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java
@@ -23,12 +23,12 @@ import java.io.Serializable;
* Each IO in Beam has one table schema, by extending {@link BaseBeamTable}.
*/
public abstract class BaseBeamTable implements BeamSqlTable, Serializable {
- protected BeamSqlRecordType beamSqlRecordType;
- public BaseBeamTable(BeamSqlRecordType beamSqlRecordType) {
- this.beamSqlRecordType = beamSqlRecordType;
+ protected BeamSqlRowType beamSqlRowType;
+ public BaseBeamTable(BeamSqlRowType beamSqlRowType) {
+ this.beamSqlRowType = beamSqlRowType;
}
- @Override public BeamSqlRecordType getRecordType() {
- return beamSqlRecordType;
+ @Override public BeamSqlRowType getRowType() {
+ return beamSqlRowType;
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java
index 8309097..5b63780 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java
@@ -31,13 +31,13 @@ public class BeamPCollectionTable extends BaseBeamTable {
private BeamIOType ioType;
private transient PCollection<BeamSqlRow> upstream;
- protected BeamPCollectionTable(BeamSqlRecordType beamSqlRecordType) {
- super(beamSqlRecordType);
+ protected BeamPCollectionTable(BeamSqlRowType beamSqlRowType) {
+ super(beamSqlRowType);
}
public BeamPCollectionTable(PCollection<BeamSqlRow> upstream,
- BeamSqlRecordType beamSqlRecordType){
- this(beamSqlRecordType);
+ BeamSqlRowType beamSqlRowType){
+ this(beamSqlRowType);
ioType = upstream.isBounded().equals(IsBounded.BOUNDED)
? BeamIOType.BOUNDED : BeamIOType.UNBOUNDED;
this.upstream = upstream;
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java
deleted file mode 100644
index 52bd652..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.schema;
-
-import com.google.auto.value.AutoValue;
-import java.io.Serializable;
-import java.util.List;
-
-/**
- * Field type information in {@link BeamSqlRow}.
- *
- */
-@AutoValue
-public abstract class BeamSqlRecordType implements Serializable {
- public abstract List<String> getFieldsName();
- public abstract List<Integer> getFieldsType();
-
- public static BeamSqlRecordType create(List<String> fieldNames, List<Integer> fieldTypes) {
- return new org.apache.beam.dsls.sql.schema.AutoValue_BeamSqlRecordType(fieldNames, fieldTypes);
- }
-
- public int size() {
- return getFieldsName().size();
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
index 5c0dbc0..d789446 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
@@ -61,12 +61,12 @@ public class BeamSqlRow implements Serializable {
private List<Integer> nullFields = new ArrayList<>();
private List<Object> dataValues;
- private BeamSqlRecordType dataType;
+ private BeamSqlRowType dataType;
private Instant windowStart = new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MIN_VALUE));
private Instant windowEnd = new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MAX_VALUE));
- public BeamSqlRow(BeamSqlRecordType dataType) {
+ public BeamSqlRow(BeamSqlRowType dataType) {
this.dataType = dataType;
this.dataValues = new ArrayList<>();
for (int idx = 0; idx < dataType.size(); ++idx) {
@@ -75,7 +75,7 @@ public class BeamSqlRow implements Serializable {
}
}
- public BeamSqlRow(BeamSqlRecordType dataType, List<Object> dataValues) {
+ public BeamSqlRow(BeamSqlRowType dataType, List<Object> dataValues) {
this(dataType);
for (int idx = 0; idx < dataValues.size(); ++idx) {
addField(idx, dataValues.get(idx));
@@ -237,11 +237,11 @@ public class BeamSqlRow implements Serializable {
this.dataValues = dataValues;
}
- public BeamSqlRecordType getDataType() {
+ public BeamSqlRowType getDataType() {
return dataType;
}
- public void setDataType(BeamSqlRecordType dataType) {
+ public void setDataType(BeamSqlRowType dataType) {
this.dataType = dataType;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
index c798b35..f14864a 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
@@ -40,7 +40,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
* A {@link Coder} encodes {@link BeamSqlRow}.
*/
public class BeamSqlRowCoder extends CustomCoder<BeamSqlRow> {
- private BeamSqlRecordType tableSchema;
+ private BeamSqlRowType tableSchema;
private static final ListCoder<Integer> listCoder = ListCoder.of(BigEndianIntegerCoder.of());
@@ -52,7 +52,7 @@ public class BeamSqlRowCoder extends CustomCoder<BeamSqlRow> {
private static final BigDecimalCoder bigDecimalCoder = BigDecimalCoder.of();
private static final ByteCoder byteCoder = ByteCoder.of();
- public BeamSqlRowCoder(BeamSqlRecordType tableSchema) {
+ public BeamSqlRowCoder(BeamSqlRowType tableSchema) {
this.tableSchema = tableSchema;
}
@@ -174,7 +174,7 @@ public class BeamSqlRowCoder extends CustomCoder<BeamSqlRow> {
return record;
}
- public BeamSqlRecordType getTableSchema() {
+ public BeamSqlRowType getTableSchema() {
return tableSchema;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowType.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowType.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowType.java
new file mode 100644
index 0000000..1129bdd
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowType.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.schema;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Field type information in {@link BeamSqlRow}.
+ *
+ */
+@AutoValue
+public abstract class BeamSqlRowType implements Serializable {
+ public abstract List<String> getFieldsName();
+ public abstract List<Integer> getFieldsType();
+
+ public static BeamSqlRowType create(List<String> fieldNames, List<Integer> fieldTypes) {
+ return new AutoValue_BeamSqlRowType(fieldNames, fieldTypes);
+ }
+
+ public int size() {
+ return getFieldsName().size();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlTable.java
index 986decb..d419473 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlTable.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlTable.java
@@ -48,5 +48,5 @@ public interface BeamSqlTable {
/**
* Get the schema info of the table.
*/
- BeamSqlRecordType getRecordType();
+ BeamSqlRowType getRowType();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java
index 7157793..4b7e76b 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java
@@ -37,19 +37,19 @@ public final class BeamTableUtils {
public static BeamSqlRow csvLine2BeamSqlRow(
CSVFormat csvFormat,
String line,
- BeamSqlRecordType beamSqlRecordType) {
- BeamSqlRow row = new BeamSqlRow(beamSqlRecordType);
+ BeamSqlRowType beamSqlRowType) {
+ BeamSqlRow row = new BeamSqlRow(beamSqlRowType);
try (StringReader reader = new StringReader(line)) {
CSVParser parser = csvFormat.parse(reader);
CSVRecord rawRecord = parser.getRecords().get(0);
- if (rawRecord.size() != beamSqlRecordType.size()) {
+ if (rawRecord.size() != beamSqlRowType.size()) {
throw new IllegalArgumentException(String.format(
"Expect %d fields, but actually %d",
- beamSqlRecordType.size(), rawRecord.size()
+ beamSqlRowType.size(), rawRecord.size()
));
} else {
- for (int idx = 0; idx < beamSqlRecordType.size(); idx++) {
+ for (int idx = 0; idx < beamSqlRowType.size(); idx++) {
String raw = rawRecord.get(idx);
addFieldWithAutoTypeCasting(row, idx, raw);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java
index 39cf8d8..a18f3de 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java
@@ -21,9 +21,8 @@ import static org.apache.beam.dsls.sql.schema.BeamTableUtils.beamSqlRow2CsvLine;
import static org.apache.beam.dsls.sql.schema.BeamTableUtils.csvLine2BeamSqlRow;
import java.util.List;
-
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
@@ -37,27 +36,27 @@ import org.apache.commons.csv.CSVFormat;
*/
public class BeamKafkaCSVTable extends BeamKafkaTable {
private CSVFormat csvFormat;
- public BeamKafkaCSVTable(BeamSqlRecordType beamSqlRecordType, String bootstrapServers,
+ public BeamKafkaCSVTable(BeamSqlRowType beamSqlRowType, String bootstrapServers,
List<String> topics) {
- this(beamSqlRecordType, bootstrapServers, topics, CSVFormat.DEFAULT);
+ this(beamSqlRowType, bootstrapServers, topics, CSVFormat.DEFAULT);
}
- public BeamKafkaCSVTable(BeamSqlRecordType beamSqlRecordType, String bootstrapServers,
+ public BeamKafkaCSVTable(BeamSqlRowType beamSqlRowType, String bootstrapServers,
List<String> topics, CSVFormat format) {
- super(beamSqlRecordType, bootstrapServers, topics);
+ super(beamSqlRowType, bootstrapServers, topics);
this.csvFormat = format;
}
@Override
public PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamSqlRow>>
getPTransformForInput() {
- return new CsvRecorderDecoder(beamSqlRecordType, csvFormat);
+ return new CsvRecorderDecoder(beamSqlRowType, csvFormat);
}
@Override
public PTransform<PCollection<BeamSqlRow>, PCollection<KV<byte[], byte[]>>>
getPTransformForOutput() {
- return new CsvRecorderEncoder(beamSqlRecordType, csvFormat);
+ return new CsvRecorderEncoder(beamSqlRowType, csvFormat);
}
/**
@@ -66,10 +65,10 @@ public class BeamKafkaCSVTable extends BeamKafkaTable {
*/
public static class CsvRecorderDecoder
extends PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamSqlRow>> {
- private BeamSqlRecordType recordType;
+ private BeamSqlRowType rowType;
private CSVFormat format;
- public CsvRecorderDecoder(BeamSqlRecordType recordType, CSVFormat format) {
- this.recordType = recordType;
+ public CsvRecorderDecoder(BeamSqlRowType rowType, CSVFormat format) {
+ this.rowType = rowType;
this.format = format;
}
@@ -79,7 +78,7 @@ public class BeamKafkaCSVTable extends BeamKafkaTable {
@ProcessElement
public void processElement(ProcessContext c) {
String rowInString = new String(c.element().getValue());
- c.output(csvLine2BeamSqlRow(format, rowInString, recordType));
+ c.output(csvLine2BeamSqlRow(format, rowInString, rowType));
}
}));
}
@@ -91,10 +90,10 @@ public class BeamKafkaCSVTable extends BeamKafkaTable {
*/
public static class CsvRecorderEncoder
extends PTransform<PCollection<BeamSqlRow>, PCollection<KV<byte[], byte[]>>> {
- private BeamSqlRecordType recordType;
+ private BeamSqlRowType rowType;
private CSVFormat format;
- public CsvRecorderEncoder(BeamSqlRecordType recordType, CSVFormat format) {
- this.recordType = recordType;
+ public CsvRecorderEncoder(BeamSqlRowType rowType, CSVFormat format) {
+ this.rowType = rowType;
this.format = format;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java
index f27014e..faa2706 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java
@@ -22,11 +22,10 @@ import static com.google.common.base.Preconditions.checkArgument;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
-
import org.apache.beam.dsls.sql.schema.BaseBeamTable;
import org.apache.beam.dsls.sql.schema.BeamIOType;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.io.kafka.KafkaIO;
@@ -49,13 +48,13 @@ public abstract class BeamKafkaTable extends BaseBeamTable implements Serializab
private List<String> topics;
private Map<String, Object> configUpdates;
- protected BeamKafkaTable(BeamSqlRecordType beamSqlRecordType) {
- super(beamSqlRecordType);
+ protected BeamKafkaTable(BeamSqlRowType beamSqlRowType) {
+ super(beamSqlRowType);
}
- public BeamKafkaTable(BeamSqlRecordType beamSqlRecordType, String bootstrapServers,
+ public BeamKafkaTable(BeamSqlRowType beamSqlRowType, String bootstrapServers,
List<String> topics) {
- super(beamSqlRecordType);
+ super(beamSqlRowType);
this.bootstrapServers = bootstrapServers;
this.topics = topics;
}
[3/3] beam git commit: This closes #3571
Posted by lz...@apache.org.
This closes #3571
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d4d615a7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d4d615a7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d4d615a7
Branch: refs/heads/DSL_SQL
Commit: d4d615a7237729f5e59085ae2bd77541c6ea7ddb
Parents: a1f7cf6 a9c8a8a
Author: JingsongLi <lz...@aliyun.com>
Authored: Wed Jul 19 10:06:04 2017 +0800
Committer: JingsongLi <lz...@aliyun.com>
Committed: Wed Jul 19 10:06:04 2017 +0800
----------------------------------------------------------------------
.../org/apache/beam/dsls/sql/BeamSqlEnv.java | 12 +++---
.../beam/dsls/sql/example/BeamSqlExample.java | 4 +-
.../interpreter/BeamSqlExpressionExecutor.java | 2 +-
.../dsls/sql/interpreter/BeamSqlFnExecutor.java | 4 +-
.../operator/BeamSqlCaseExpression.java | 8 ++--
.../operator/BeamSqlCastExpression.java | 22 +++++------
.../operator/BeamSqlCompareExpression.java | 6 +--
.../interpreter/operator/BeamSqlExpression.java | 2 +-
.../operator/BeamSqlInputRefExpression.java | 4 +-
.../operator/BeamSqlIsNotNullExpression.java | 4 +-
.../operator/BeamSqlIsNullExpression.java | 4 +-
.../interpreter/operator/BeamSqlPrimitive.java | 2 +-
.../operator/BeamSqlReinterpretExpression.java | 6 +--
.../operator/BeamSqlUdfExpression.java | 4 +-
.../operator/BeamSqlWindowEndExpression.java | 4 +-
.../operator/BeamSqlWindowExpression.java | 4 +-
.../operator/BeamSqlWindowStartExpression.java | 4 +-
.../arithmetic/BeamSqlArithmeticExpression.java | 6 +--
.../date/BeamSqlCurrentDateExpression.java | 2 +-
.../date/BeamSqlCurrentTimeExpression.java | 2 +-
.../date/BeamSqlCurrentTimestampExpression.java | 2 +-
.../date/BeamSqlDateCeilExpression.java | 4 +-
.../date/BeamSqlDateFloorExpression.java | 4 +-
.../operator/date/BeamSqlExtractExpression.java | 4 +-
.../operator/logical/BeamSqlAndExpression.java | 4 +-
.../operator/logical/BeamSqlNotExpression.java | 4 +-
.../operator/logical/BeamSqlOrExpression.java | 4 +-
.../math/BeamSqlMathBinaryExpression.java | 4 +-
.../math/BeamSqlMathUnaryExpression.java | 4 +-
.../operator/math/BeamSqlPiExpression.java | 2 +-
.../string/BeamSqlCharLengthExpression.java | 4 +-
.../string/BeamSqlConcatExpression.java | 6 +--
.../string/BeamSqlInitCapExpression.java | 4 +-
.../operator/string/BeamSqlLowerExpression.java | 4 +-
.../string/BeamSqlOverlayExpression.java | 10 ++---
.../string/BeamSqlPositionExpression.java | 8 ++--
.../string/BeamSqlSubstringExpression.java | 8 ++--
.../operator/string/BeamSqlTrimExpression.java | 10 ++---
.../operator/string/BeamSqlUpperExpression.java | 4 +-
.../beam/dsls/sql/rel/BeamAggregationRel.java | 22 +++++------
.../apache/beam/dsls/sql/rel/BeamFilterRel.java | 2 +-
.../beam/dsls/sql/rel/BeamIOSourceRel.java | 2 +-
.../apache/beam/dsls/sql/rel/BeamJoinRel.java | 12 +++---
.../beam/dsls/sql/rel/BeamProjectRel.java | 4 +-
.../apache/beam/dsls/sql/rel/BeamSortRel.java | 2 +-
.../apache/beam/dsls/sql/rel/BeamValuesRel.java | 9 ++---
.../beam/dsls/sql/schema/BaseBeamTable.java | 10 ++---
.../dsls/sql/schema/BeamPCollectionTable.java | 8 ++--
.../beam/dsls/sql/schema/BeamSqlRecordType.java | 40 --------------------
.../apache/beam/dsls/sql/schema/BeamSqlRow.java | 10 ++---
.../beam/dsls/sql/schema/BeamSqlRowCoder.java | 6 +--
.../beam/dsls/sql/schema/BeamSqlRowType.java | 40 ++++++++++++++++++++
.../beam/dsls/sql/schema/BeamSqlTable.java | 2 +-
.../beam/dsls/sql/schema/BeamTableUtils.java | 10 ++---
.../sql/schema/kafka/BeamKafkaCSVTable.java | 29 +++++++-------
.../dsls/sql/schema/kafka/BeamKafkaTable.java | 11 +++---
.../dsls/sql/schema/text/BeamTextCSVTable.java | 14 +++----
.../schema/text/BeamTextCSVTableIOReader.java | 11 +++---
.../schema/text/BeamTextCSVTableIOWriter.java | 9 ++---
.../dsls/sql/schema/text/BeamTextTable.java | 6 +--
.../transform/BeamAggregationTransforms.java | 26 ++++++-------
.../dsls/sql/transform/BeamJoinTransforms.java | 6 +--
.../dsls/sql/transform/BeamSqlProjectFn.java | 16 ++++----
.../beam/dsls/sql/utils/CalciteUtils.java | 12 +++---
.../dsls/sql/BeamSqlDslAggregationTest.java | 14 +++----
.../apache/beam/dsls/sql/BeamSqlDslBase.java | 26 ++++++-------
.../beam/dsls/sql/BeamSqlDslJoinTest.java | 10 ++---
.../beam/dsls/sql/BeamSqlDslProjectTest.java | 10 ++---
.../beam/dsls/sql/BeamSqlDslUdfUdafTest.java | 6 +--
.../org/apache/beam/dsls/sql/TestUtils.java | 28 +++++++-------
...mSqlBuiltinFunctionsIntegrationTestBase.java | 6 +--
.../interpreter/BeamSqlFnExecutorTestBase.java | 8 ++--
.../beam/dsls/sql/mock/MockedBoundedTable.java | 14 +++----
.../apache/beam/dsls/sql/mock/MockedTable.java | 6 +--
.../dsls/sql/mock/MockedUnboundedTable.java | 14 +++----
.../dsls/sql/schema/BeamSqlRowCoderTest.java | 6 +--
.../sql/schema/kafka/BeamKafkaCSVTableTest.java | 7 ++--
.../sql/schema/text/BeamTextCSVTableTest.java | 17 ++++-----
.../transform/BeamAggregationTransformTest.java | 13 +++----
.../schema/transform/BeamTransformBaseTest.java | 12 +++---
80 files changed, 354 insertions(+), 362 deletions(-)
----------------------------------------------------------------------