You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/08/09 02:08:58 UTC
[1/2] beam git commit: [BEAM-2730] make BeamRecord an immutable type
Repository: beam
Updated Branches:
refs/heads/DSL_SQL 926c70a34 -> 66286749f
[BEAM-2730] make BeamRecord an immutable type
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fa36b128
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fa36b128
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fa36b128
Branch: refs/heads/DSL_SQL
Commit: fa36b1285a4d8fc967b6302456d0f7fcf7943894
Parents: 926c70a
Author: mingmxu <mi...@ebay.com>
Authored: Mon Aug 7 16:12:21 2017 -0700
Committer: Tyler Akidau <ta...@apache.org>
Committed: Tue Aug 8 19:07:03 2017 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/coders/BeamRecordCoder.java | 10 +-
.../org/apache/beam/sdk/values/BeamRecord.java | 24 ++---
.../extensions/sql/example/BeamSqlExample.java | 5 +-
.../extensions/sql/impl/rel/BeamJoinRel.java | 7 +-
.../extensions/sql/impl/rel/BeamValuesRel.java | 7 +-
.../transform/BeamAggregationTransforms.java | 26 +++--
.../sql/impl/transform/BeamJoinTransforms.java | 20 ++--
.../sql/impl/transform/BeamSqlProjectFn.java | 9 +-
.../sql/schema/BeamSqlRecordType.java | 5 +-
.../extensions/sql/schema/BeamTableUtils.java | 41 +++----
.../sql/BeamSqlDslAggregationTest.java | 107 ++++---------------
.../beam/sdk/extensions/sql/BeamSqlDslBase.java | 56 +++-------
.../extensions/sql/BeamSqlDslProjectTest.java | 48 ++++-----
.../extensions/sql/BeamSqlDslUdfUdafTest.java | 8 +-
.../beam/sdk/extensions/sql/TestUtils.java | 6 +-
.../interpreter/BeamSqlFnExecutorTestBase.java | 8 +-
.../sql/schema/BeamSqlRowCoderTest.java | 16 +--
.../sql/schema/kafka/BeamKafkaCSVTableTest.java | 12 +--
18 files changed, 132 insertions(+), 283 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/fa36b128/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java
index a6200f6..4e24b82 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.coders;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.ArrayList;
import java.util.BitSet;
import java.util.List;
import org.apache.beam.sdk.annotations.Experimental;
@@ -69,14 +70,15 @@ public class BeamRecordCoder extends CustomCoder<BeamRecord> {
public BeamRecord decode(InputStream inStream) throws CoderException, IOException {
BitSet nullFields = nullListCoder.decode(inStream);
- BeamRecord record = new BeamRecord(recordType);
+ List<Object> fieldValues = new ArrayList<>(recordType.size());
for (int idx = 0; idx < recordType.size(); ++idx) {
if (nullFields.get(idx)) {
- continue;
+ fieldValues.add(null);
+ } else {
+ fieldValues.add(coderArray.get(idx).decode(inStream));
}
-
- record.addField(idx, coderArray.get(idx).decode(inStream));
}
+ BeamRecord record = new BeamRecord(recordType, fieldValues);
return record;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/fa36b128/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
index 35a96f6..6e4bd4c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.values;
import java.io.Serializable;
import java.math.BigDecimal;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Date;
import java.util.GregorianCalendar;
import java.util.List;
@@ -32,29 +33,28 @@ import org.apache.beam.sdk.annotations.Experimental;
*/
@Experimental
public class BeamRecord implements Serializable {
+ //immutable list of field values.
private List<Object> dataValues;
private BeamRecordType dataType;
- public BeamRecord(BeamRecordType dataType) {
+ public BeamRecord(BeamRecordType dataType, List<Object> rawdataValues) {
this.dataType = dataType;
- this.dataValues = new ArrayList<>();
+ this.dataValues = new ArrayList<>(dataType.size());
+
for (int idx = 0; idx < dataType.size(); ++idx) {
dataValues.add(null);
}
- }
- public BeamRecord(BeamRecordType dataType, List<Object> dataValues) {
- this(dataType);
- for (int idx = 0; idx < dataValues.size(); ++idx) {
- addField(idx, dataValues.get(idx));
+ for (int idx = 0; idx < dataType.size(); ++idx) {
+ addField(idx, rawdataValues.get(idx));
}
}
- public void addField(String fieldName, Object fieldValue) {
- addField(dataType.getFieldsName().indexOf(fieldName), fieldValue);
+ public BeamRecord(BeamRecordType dataType, Object... rawdataValues) {
+ this(dataType, Arrays.asList(rawdataValues));
}
- public void addField(int index, Object fieldValue) {
+ private void addField(int index, Object fieldValue) {
dataType.validateValueType(index, fieldValue);
dataValues.set(index, fieldValue);
}
@@ -163,10 +163,6 @@ public class BeamRecord implements Serializable {
return dataValues;
}
- public void setDataValues(List<Object> dataValues) {
- this.dataValues = dataValues;
- }
-
public BeamRecordType getDataType() {
return dataType;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/fa36b128/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
index fbc1fd8..acb5943 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
@@ -54,10 +54,7 @@ class BeamSqlExample {
List<String> fieldNames = Arrays.asList("c1", "c2", "c3");
List<Integer> fieldTypes = Arrays.asList(Types.INTEGER, Types.VARCHAR, Types.DOUBLE);
BeamSqlRecordType type = BeamSqlRecordType.create(fieldNames, fieldTypes);
- BeamRecord row = new BeamRecord(type);
- row.addField(0, 1);
- row.addField(1, "row");
- row.addField(2, 1.0);
+ BeamRecord row = new BeamRecord(type, 1, "row", 1.0);
//create a source PCollection with Create.of();
PCollection<BeamRecord> inputTable = PBegin.in(p).apply(Create.of(row)
http://git-wip-us.apache.org/repos/asf/beam/blob/fa36b128/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
index 9e5ce2f..2bd15b3 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
@@ -19,6 +19,7 @@
package org.apache.beam.sdk.extensions.sql.impl.rel;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -255,11 +256,7 @@ public class BeamJoinRel extends Join implements BeamRelNode {
private BeamRecord buildNullRow(BeamRelNode relNode) {
BeamSqlRecordType leftType = CalciteUtils.toBeamRowType(relNode.getRowType());
- BeamRecord nullRow = new BeamRecord(leftType);
- for (int i = 0; i < leftType.size(); i++) {
- nullRow.addField(i, null);
- }
- return nullRow;
+ return new BeamRecord(leftType, Collections.nCopies(leftType.size(), null));
}
private List<Pair<Integer, Integer>> extractJoinColumns(int leftRowColumnCount) {
http://git-wip-us.apache.org/repos/asf/beam/blob/fa36b128/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
index 8ad6e8d..1d666ca 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
@@ -65,11 +65,12 @@ public class BeamValuesRel extends Values implements BeamRelNode {
BeamSqlRecordType beamSQLRowType = CalciteUtils.toBeamRowType(this.getRowType());
for (ImmutableList<RexLiteral> tuple : tuples) {
- BeamRecord row = new BeamRecord(beamSQLRowType);
+ List<Object> fieldsValue = new ArrayList<>(beamSQLRowType.size());
for (int i = 0; i < tuple.size(); i++) {
- BeamTableUtils.addFieldWithAutoTypeCasting(row, i, tuple.get(i).getValue());
+ fieldsValue.add(BeamTableUtils.autoCastField(
+ beamSQLRowType.getFieldsType().get(i), tuple.get(i).getValue()));
}
- rows.add(row);
+ rows.add(new BeamRecord(beamSQLRowType, fieldsValue));
}
return inputPCollections.getPipeline().apply(stageName, Create.of(rows))
http://git-wip-us.apache.org/repos/asf/beam/blob/fa36b128/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java
index ce5444f..c6a5d26 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java
@@ -75,19 +75,15 @@ public class BeamAggregationTransforms implements Serializable{
@ProcessElement
public void processElement(ProcessContext c, BoundedWindow window) {
- BeamRecord outRecord = new BeamRecord(outRowType);
-
+ List<Object> fieldValues = new ArrayList<>();
KV<BeamRecord, BeamRecord> kvRecord = c.element();
- for (String f : kvRecord.getKey().getDataType().getFieldsName()) {
- outRecord.addField(f, kvRecord.getKey().getFieldValue(f));
- }
- for (int idx = 0; idx < aggFieldNames.size(); ++idx) {
- outRecord.addField(aggFieldNames.get(idx), kvRecord.getValue().getFieldValue(idx));
- }
+ fieldValues.addAll(kvRecord.getKey().getDataValues());
+ fieldValues.addAll(kvRecord.getValue().getDataValues());
if (windowStartFieldIdx != -1) {
- outRecord.addField(windowStartFieldIdx, ((IntervalWindow) window).start().toDate());
+ fieldValues.add(windowStartFieldIdx, ((IntervalWindow) window).start().toDate());
}
+ BeamRecord outRecord = new BeamRecord(outRowType, fieldValues);
c.output(outRecord);
}
}
@@ -111,11 +107,13 @@ public class BeamAggregationTransforms implements Serializable{
@Override
public BeamRecord apply(BeamRecord input) {
BeamSqlRecordType typeOfKey = exTypeOfKeyRecord(BeamSqlRecordHelper.getSqlRecordType(input));
- BeamRecord keyOfRecord = new BeamRecord(typeOfKey);
+ List<Object> fieldValues = new ArrayList<>(groupByKeys.size());
for (int idx = 0; idx < groupByKeys.size(); ++idx) {
- keyOfRecord.addField(idx, input.getFieldValue(groupByKeys.get(idx)));
+ fieldValues.add(input.getFieldValue(groupByKeys.get(idx)));
}
+
+ BeamRecord keyOfRecord = new BeamRecord(typeOfKey, fieldValues);
return keyOfRecord;
}
@@ -241,11 +239,11 @@ public class BeamAggregationTransforms implements Serializable{
}
@Override
public BeamRecord extractOutput(AggregationAccumulator accumulator) {
- BeamRecord result = new BeamRecord(finalRowType);
+ List<Object> fieldValues = new ArrayList<>(aggregators.size());
for (int idx = 0; idx < aggregators.size(); ++idx) {
- result.addField(idx, aggregators.get(idx).result(accumulator.accumulatorElements.get(idx)));
+ fieldValues.add(aggregators.get(idx).result(accumulator.accumulatorElements.get(idx)));
}
- return result;
+ return new BeamRecord(finalRowType, fieldValues);
}
@Override
public Coder<AggregationAccumulator> getAccumulatorCoder(
http://git-wip-us.apache.org/repos/asf/beam/blob/fa36b128/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
index 105bbf3..8f34704 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
@@ -66,12 +66,12 @@ public class BeamJoinTransforms {
BeamSqlRecordType type = BeamSqlRecordType.create(names, types);
// build the row
- BeamRecord row = new BeamRecord(type);
+ List<Object> fieldValues = new ArrayList<>(joinColumns.size());
for (int i = 0; i < joinColumns.size(); i++) {
- row.addField(i, input
+ fieldValues.add(input
.getFieldValue(isLeft ? joinColumns.get(i).getKey() : joinColumns.get(i).getValue()));
}
- return KV.of(row, input);
+ return KV.of(new BeamRecord(type, fieldValues), input);
}
}
@@ -154,16 +154,8 @@ public class BeamJoinTransforms {
types.addAll(BeamSqlRecordHelper.getSqlRecordType(rightRow).getFieldsType());
BeamSqlRecordType type = BeamSqlRecordType.create(names, types);
- BeamRecord row = new BeamRecord(type);
- // build the row
- for (int i = 0; i < leftRow.size(); i++) {
- row.addField(i, leftRow.getFieldValue(i));
- }
-
- for (int i = 0; i < rightRow.size(); i++) {
- row.addField(i + leftRow.size(), rightRow.getFieldValue(i));
- }
-
- return row;
+ List<Object> fieldValues = new ArrayList<>(leftRow.getDataValues());
+ fieldValues.addAll(rightRow.getDataValues());
+ return new BeamRecord(type, fieldValues);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/fa36b128/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java
index 45dc621..34d6dbb 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.extensions.sql.impl.transform;
+import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionExecutor;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamProjectRel;
@@ -53,12 +54,12 @@ public class BeamSqlProjectFn extends DoFn<BeamRecord, BeamRecord> {
public void processElement(ProcessContext c, BoundedWindow window) {
BeamRecord inputRow = c.element();
List<Object> results = executor.execute(inputRow, window);
-
- BeamRecord outRow = new BeamRecord(outputRowType);
-
+ List<Object> fieldsValue = new ArrayList<>(results.size());
for (int idx = 0; idx < results.size(); ++idx) {
- BeamTableUtils.addFieldWithAutoTypeCasting(outRow, idx, results.get(idx));
+ fieldsValue.add(
+ BeamTableUtils.autoCastField(outputRowType.getFieldsType().get(idx), results.get(idx)));
}
+ BeamRecord outRow = new BeamRecord(outputRowType, fieldsValue);
c.output(outRow);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/fa36b128/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordType.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordType.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordType.java
index fe82834..b7c7438 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordType.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordType.java
@@ -84,7 +84,10 @@ public class BeamSqlRecordType extends BeamRecordType {
public static BeamSqlRecordType create(List<String> fieldNames,
List<Integer> fieldTypes) {
- List<Coder> fieldCoders = new ArrayList<>();
+ if (fieldNames.size() != fieldTypes.size()) {
+ throw new IllegalStateException("the sizes of 'dataType' and 'fieldTypes' must match.");
+ }
+ List<Coder> fieldCoders = new ArrayList<>(fieldTypes.size());
for (int idx = 0; idx < fieldTypes.size(); ++idx) {
switch (fieldTypes.get(idx)) {
case Types.INTEGER:
http://git-wip-us.apache.org/repos/asf/beam/blob/fa36b128/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java
index 63c9720..19d3e39 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java
@@ -22,6 +22,8 @@ import java.io.IOException;
import java.io.StringReader;
import java.io.StringWriter;
import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
@@ -39,7 +41,7 @@ public final class BeamTableUtils {
CSVFormat csvFormat,
String line,
BeamSqlRecordType beamSqlRowType) {
- BeamRecord row = new BeamRecord(beamSqlRowType);
+ List<Object> fieldsValue = new ArrayList<>(beamSqlRowType.size());
try (StringReader reader = new StringReader(line)) {
CSVParser parser = csvFormat.parse(reader);
CSVRecord rawRecord = parser.getRecords().get(0);
@@ -52,13 +54,13 @@ public final class BeamTableUtils {
} else {
for (int idx = 0; idx < beamSqlRowType.size(); idx++) {
String raw = rawRecord.get(idx);
- addFieldWithAutoTypeCasting(row, idx, raw);
+ fieldsValue.add(autoCastField(beamSqlRowType.getFieldsType().get(idx), raw));
}
}
} catch (IOException e) {
throw new IllegalArgumentException("decodeRecord failed!", e);
}
- return row;
+ return new BeamRecord(beamSqlRowType, fieldsValue);
}
public static String beamSqlRow2CsvLine(BeamRecord row, CSVFormat csvFormat) {
@@ -74,37 +76,29 @@ public final class BeamTableUtils {
return writer.toString();
}
- public static void addFieldWithAutoTypeCasting(BeamRecord row, int idx, Object rawObj) {
+ public static Object autoCastField(int fieldType, Object rawObj) {
if (rawObj == null) {
- row.addField(idx, null);
- return;
+ return null;
}
- SqlTypeName columnType = CalciteUtils.getFieldType(BeamSqlRecordHelper.getSqlRecordType(row)
- , idx);
+ SqlTypeName columnType = CalciteUtils.toCalciteType(fieldType);
// auto-casting for numberics
if ((rawObj instanceof String && SqlTypeName.NUMERIC_TYPES.contains(columnType))
|| (rawObj instanceof BigDecimal && columnType != SqlTypeName.DECIMAL)) {
String raw = rawObj.toString();
switch (columnType) {
case TINYINT:
- row.addField(idx, Byte.valueOf(raw));
- break;
+ return Byte.valueOf(raw);
case SMALLINT:
- row.addField(idx, Short.valueOf(raw));
- break;
+ return Short.valueOf(raw);
case INTEGER:
- row.addField(idx, Integer.valueOf(raw));
- break;
+ return Integer.valueOf(raw);
case BIGINT:
- row.addField(idx, Long.valueOf(raw));
- break;
+ return Long.valueOf(raw);
case FLOAT:
- row.addField(idx, Float.valueOf(raw));
- break;
+ return Float.valueOf(raw);
case DOUBLE:
- row.addField(idx, Double.valueOf(raw));
- break;
+ return Double.valueOf(raw);
default:
throw new UnsupportedOperationException(
String.format("Column type %s is not supported yet!", columnType));
@@ -112,13 +106,12 @@ public final class BeamTableUtils {
} else if (SqlTypeName.CHAR_TYPES.contains(columnType)) {
// convert NlsString to String
if (rawObj instanceof NlsString) {
- row.addField(idx, ((NlsString) rawObj).getValue());
+ return ((NlsString) rawObj).getValue();
} else {
- row.addField(idx, rawObj);
+ return rawObj;
}
} else {
- // keep the origin
- row.addField(idx, rawObj);
+ return rawObj;
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/fa36b128/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
index 71278ec..19ca398 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
@@ -57,9 +57,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int2", "size"),
Arrays.asList(Types.INTEGER, Types.BIGINT));
- BeamRecord record = new BeamRecord(resultType);
- record.addField("f_int2", 0);
- record.addField("size", 4L);
+ BeamRecord record = new BeamRecord(resultType, 0, 4L);
PAssert.that(result).containsInAnyOrder(record);
@@ -107,37 +105,14 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
Types.FLOAT, Types.FLOAT, Types.DOUBLE, Types.DOUBLE, Types.DOUBLE, Types.DOUBLE,
Types.TIMESTAMP, Types.TIMESTAMP));
- BeamRecord record = new BeamRecord(resultType);
- record.addField("f_int2", 0);
- record.addField("size", 4L);
-
- record.addField("sum1", 10000L);
- record.addField("avg1", 2500L);
- record.addField("max1", 4000L);
- record.addField("min1", 1000L);
-
- record.addField("sum2", (short) 10);
- record.addField("avg2", (short) 2);
- record.addField("max2", (short) 4);
- record.addField("min2", (short) 1);
-
- record.addField("sum3", (byte) 10);
- record.addField("avg3", (byte) 2);
- record.addField("max3", (byte) 4);
- record.addField("min3", (byte) 1);
-
- record.addField("sum4", 10.0F);
- record.addField("avg4", 2.5F);
- record.addField("max4", 4.0F);
- record.addField("min4", 1.0F);
-
- record.addField("sum5", 10.0);
- record.addField("avg5", 2.5);
- record.addField("max5", 4.0);
- record.addField("min5", 1.0);
-
- record.addField("max6", FORMAT.parse("2017-01-01 02:04:03"));
- record.addField("min6", FORMAT.parse("2017-01-01 01:01:03"));
+ BeamRecord record = new BeamRecord(resultType
+ , 0, 4L
+ , 10000L, 2500L, 4000L, 1000L
+ , (short) 10, (short) 2, (short) 4, (short) 1
+ , (byte) 10, (byte) 2, (byte) 4, (byte) 1
+ , 10.0F, 2.5F, 4.0F, 1.0F
+ , 10.0, 2.5, 4.0, 1.0
+ , FORMAT.parse("2017-01-01 02:04:03"), FORMAT.parse("2017-01-01 01:01:03"));
PAssert.that(result).containsInAnyOrder(record);
@@ -169,21 +144,10 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"),
Arrays.asList(Types.INTEGER, Types.BIGINT));
- BeamRecord record1 = new BeamRecord(resultType);
- record1.addField("f_int", 1);
- record1.addField("f_long", 1000L);
-
- BeamRecord record2 = new BeamRecord(resultType);
- record2.addField("f_int", 2);
- record2.addField("f_long", 2000L);
-
- BeamRecord record3 = new BeamRecord(resultType);
- record3.addField("f_int", 3);
- record3.addField("f_long", 3000L);
-
- BeamRecord record4 = new BeamRecord(resultType);
- record4.addField("f_int", 4);
- record4.addField("f_long", 4000L);
+ BeamRecord record1 = new BeamRecord(resultType, 1, 1000L);
+ BeamRecord record2 = new BeamRecord(resultType, 2, 2000L);
+ BeamRecord record3 = new BeamRecord(resultType, 3, 3000L);
+ BeamRecord record4 = new BeamRecord(resultType, 4, 4000L);
PAssert.that(result).containsInAnyOrder(record1, record2, record3, record4);
@@ -219,15 +183,8 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
Arrays.asList("f_int2", "size", "window_start"),
Arrays.asList(Types.INTEGER, Types.BIGINT, Types.TIMESTAMP));
- BeamRecord record1 = new BeamRecord(resultType);
- record1.addField("f_int2", 0);
- record1.addField("size", 3L);
- record1.addField("window_start", FORMAT.parse("2017-01-01 01:00:00"));
-
- BeamRecord record2 = new BeamRecord(resultType);
- record2.addField("f_int2", 0);
- record2.addField("size", 1L);
- record2.addField("window_start", FORMAT.parse("2017-01-01 02:00:00"));
+ BeamRecord record1 = new BeamRecord(resultType, 0, 3L, FORMAT.parse("2017-01-01 01:00:00"));
+ BeamRecord record2 = new BeamRecord(resultType, 0, 1L, FORMAT.parse("2017-01-01 02:00:00"));
PAssert.that(result).containsInAnyOrder(record1, record2);
@@ -262,25 +219,10 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
Arrays.asList("f_int2", "size", "window_start"),
Arrays.asList(Types.INTEGER, Types.BIGINT, Types.TIMESTAMP));
- BeamRecord record1 = new BeamRecord(resultType);
- record1.addField("f_int2", 0);
- record1.addField("size", 3L);
- record1.addField("window_start", FORMAT.parse("2017-01-01 00:30:00"));
-
- BeamRecord record2 = new BeamRecord(resultType);
- record2.addField("f_int2", 0);
- record2.addField("size", 3L);
- record2.addField("window_start", FORMAT.parse("2017-01-01 01:00:00"));
-
- BeamRecord record3 = new BeamRecord(resultType);
- record3.addField("f_int2", 0);
- record3.addField("size", 1L);
- record3.addField("window_start", FORMAT.parse("2017-01-01 01:30:00"));
-
- BeamRecord record4 = new BeamRecord(resultType);
- record4.addField("f_int2", 0);
- record4.addField("size", 1L);
- record4.addField("window_start", FORMAT.parse("2017-01-01 02:00:00"));
+ BeamRecord record1 = new BeamRecord(resultType, 0, 3L, FORMAT.parse("2017-01-01 00:30:00"));
+ BeamRecord record2 = new BeamRecord(resultType, 0, 3L, FORMAT.parse("2017-01-01 01:00:00"));
+ BeamRecord record3 = new BeamRecord(resultType, 0, 1L, FORMAT.parse("2017-01-01 01:30:00"));
+ BeamRecord record4 = new BeamRecord(resultType, 0, 1L, FORMAT.parse("2017-01-01 02:00:00"));
PAssert.that(result).containsInAnyOrder(record1, record2, record3, record4);
@@ -316,15 +258,8 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
Arrays.asList("f_int2", "size", "window_start"),
Arrays.asList(Types.INTEGER, Types.BIGINT, Types.TIMESTAMP));
- BeamRecord record1 = new BeamRecord(resultType);
- record1.addField("f_int2", 0);
- record1.addField("size", 3L);
- record1.addField("window_start", FORMAT.parse("2017-01-01 01:01:03"));
-
- BeamRecord record2 = new BeamRecord(resultType);
- record2.addField("f_int2", 0);
- record2.addField("size", 1L);
- record2.addField("window_start", FORMAT.parse("2017-01-01 02:04:03"));
+ BeamRecord record1 = new BeamRecord(resultType, 0, 3L, FORMAT.parse("2017-01-01 01:01:03"));
+ BeamRecord record2 = new BeamRecord(resultType, 0, 1L, FORMAT.parse("2017-01-01 02:04:03"));
PAssert.that(result).containsInAnyOrder(record1, record2);
http://git-wip-us.apache.org/repos/asf/beam/blob/fa36b128/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java
index d09caf0..02427ae 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java
@@ -112,56 +112,24 @@ public class BeamSqlDslBase {
private static List<BeamRecord> prepareInputRowsInTableA() throws ParseException{
List<BeamRecord> rows = new ArrayList<>();
- BeamRecord row1 = new BeamRecord(rowTypeInTableA);
- row1.addField(0, 1);
- row1.addField(1, 1000L);
- row1.addField(2, Short.valueOf("1"));
- row1.addField(3, Byte.valueOf("1"));
- row1.addField(4, 1.0f);
- row1.addField(5, 1.0);
- row1.addField(6, "string_row1");
- row1.addField(7, FORMAT.parse("2017-01-01 01:01:03"));
- row1.addField(8, 0);
- row1.addField(9, new BigDecimal(1));
+ BeamRecord row1 = new BeamRecord(rowTypeInTableA
+ , 1, 1000L, Short.valueOf("1"), Byte.valueOf("1"), 1.0f, 1.0, "string_row1"
+ , FORMAT.parse("2017-01-01 01:01:03"), 0, new BigDecimal(1));
rows.add(row1);
- BeamRecord row2 = new BeamRecord(rowTypeInTableA);
- row2.addField(0, 2);
- row2.addField(1, 2000L);
- row2.addField(2, Short.valueOf("2"));
- row2.addField(3, Byte.valueOf("2"));
- row2.addField(4, 2.0f);
- row2.addField(5, 2.0);
- row2.addField(6, "string_row2");
- row2.addField(7, FORMAT.parse("2017-01-01 01:02:03"));
- row2.addField(8, 0);
- row2.addField(9, new BigDecimal(2));
+ BeamRecord row2 = new BeamRecord(rowTypeInTableA
+ , 2, 2000L, Short.valueOf("2"), Byte.valueOf("2"), 2.0f, 2.0, "string_row2"
+ , FORMAT.parse("2017-01-01 01:02:03"), 0, new BigDecimal(2));
rows.add(row2);
- BeamRecord row3 = new BeamRecord(rowTypeInTableA);
- row3.addField(0, 3);
- row3.addField(1, 3000L);
- row3.addField(2, Short.valueOf("3"));
- row3.addField(3, Byte.valueOf("3"));
- row3.addField(4, 3.0f);
- row3.addField(5, 3.0);
- row3.addField(6, "string_row3");
- row3.addField(7, FORMAT.parse("2017-01-01 01:06:03"));
- row3.addField(8, 0);
- row3.addField(9, new BigDecimal(3));
+ BeamRecord row3 = new BeamRecord(rowTypeInTableA
+ , 3, 3000L, Short.valueOf("3"), Byte.valueOf("3"), 3.0f, 3.0, "string_row3"
+ , FORMAT.parse("2017-01-01 01:06:03"), 0, new BigDecimal(3));
rows.add(row3);
- BeamRecord row4 = new BeamRecord(rowTypeInTableA);
- row4.addField(0, 4);
- row4.addField(1, 4000L);
- row4.addField(2, Short.valueOf("4"));
- row4.addField(3, Byte.valueOf("4"));
- row4.addField(4, 4.0f);
- row4.addField(5, 4.0);
- row4.addField(6, "string_row4");
- row4.addField(7, FORMAT.parse("2017-01-01 02:04:03"));
- row4.addField(8, 0);
- row4.addField(9, new BigDecimal(4));
+ BeamRecord row4 = new BeamRecord(rowTypeInTableA
+ , 4, 4000L, Short.valueOf("4"), Byte.valueOf("4"), 4.0f, 4.0, "string_row4"
+ , FORMAT.parse("2017-01-01 02:04:03"), 0, new BigDecimal(4));
rows.add(row4);
return rows;
http://git-wip-us.apache.org/repos/asf/beam/blob/fa36b128/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java
index ddb90d5..c8041a8 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java
@@ -84,9 +84,8 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"),
Arrays.asList(Types.INTEGER, Types.BIGINT));
- BeamRecord record = new BeamRecord(resultType);
- record.addField("f_int", recordsInTableA.get(0).getFieldValue(0));
- record.addField("f_long", recordsInTableA.get(0).getFieldValue(1));
+ BeamRecord record = new BeamRecord(resultType
+ , recordsInTableA.get(0).getFieldValue(0), recordsInTableA.get(0).getFieldValue(1));
PAssert.that(result).containsInAnyOrder(record);
@@ -119,21 +118,17 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"),
Arrays.asList(Types.INTEGER, Types.BIGINT));
- BeamRecord record1 = new BeamRecord(resultType);
- record1.addField("f_int", recordsInTableA.get(0).getFieldValue(0));
- record1.addField("f_long", recordsInTableA.get(0).getFieldValue(1));
+ BeamRecord record1 = new BeamRecord(resultType
+ , recordsInTableA.get(0).getFieldValue(0), recordsInTableA.get(0).getFieldValue(1));
- BeamRecord record2 = new BeamRecord(resultType);
- record2.addField("f_int", recordsInTableA.get(1).getFieldValue(0));
- record2.addField("f_long", recordsInTableA.get(1).getFieldValue(1));
+ BeamRecord record2 = new BeamRecord(resultType
+ , recordsInTableA.get(1).getFieldValue(0), recordsInTableA.get(1).getFieldValue(1));
- BeamRecord record3 = new BeamRecord(resultType);
- record3.addField("f_int", recordsInTableA.get(2).getFieldValue(0));
- record3.addField("f_long", recordsInTableA.get(2).getFieldValue(1));
+ BeamRecord record3 = new BeamRecord(resultType
+ , recordsInTableA.get(2).getFieldValue(0), recordsInTableA.get(2).getFieldValue(1));
- BeamRecord record4 = new BeamRecord(resultType);
- record4.addField("f_int", recordsInTableA.get(3).getFieldValue(0));
- record4.addField("f_long", recordsInTableA.get(3).getFieldValue(1));
+ BeamRecord record4 = new BeamRecord(resultType
+ , recordsInTableA.get(3).getFieldValue(0), recordsInTableA.get(3).getFieldValue(1));
PAssert.that(result).containsInAnyOrder(record1, record2, record3, record4);
@@ -166,21 +161,17 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"),
Arrays.asList(Types.INTEGER, Types.BIGINT));
- BeamRecord record1 = new BeamRecord(resultType);
- record1.addField("f_int", recordsInTableA.get(0).getFieldValue(0));
- record1.addField("f_long", recordsInTableA.get(0).getFieldValue(1));
+ BeamRecord record1 = new BeamRecord(resultType
+ , recordsInTableA.get(0).getFieldValue(0), recordsInTableA.get(0).getFieldValue(1));
- BeamRecord record2 = new BeamRecord(resultType);
- record2.addField("f_int", recordsInTableA.get(1).getFieldValue(0));
- record2.addField("f_long", recordsInTableA.get(1).getFieldValue(1));
+ BeamRecord record2 = new BeamRecord(resultType
+ , recordsInTableA.get(1).getFieldValue(0), recordsInTableA.get(1).getFieldValue(1));
- BeamRecord record3 = new BeamRecord(resultType);
- record3.addField("f_int", recordsInTableA.get(2).getFieldValue(0));
- record3.addField("f_long", recordsInTableA.get(2).getFieldValue(1));
+ BeamRecord record3 = new BeamRecord(resultType
+ , recordsInTableA.get(2).getFieldValue(0), recordsInTableA.get(2).getFieldValue(1));
- BeamRecord record4 = new BeamRecord(resultType);
- record4.addField("f_int", recordsInTableA.get(3).getFieldValue(0));
- record4.addField("f_long", recordsInTableA.get(3).getFieldValue(1));
+ BeamRecord record4 = new BeamRecord(resultType
+ , recordsInTableA.get(3).getFieldValue(0), recordsInTableA.get(3).getFieldValue(1));
PAssert.that(result).containsInAnyOrder(record1, record2, record3, record4);
@@ -213,8 +204,7 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("literal_field"),
Arrays.asList(Types.INTEGER));
- BeamRecord record = new BeamRecord(resultType);
- record.addField("literal_field", 1);
+ BeamRecord record = new BeamRecord(resultType, 1);
PAssert.that(result).containsInAnyOrder(record);
http://git-wip-us.apache.org/repos/asf/beam/blob/fa36b128/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
index e3c6aec..25e76e9 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
@@ -42,9 +42,7 @@ public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase {
BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int2", "squaresum"),
Arrays.asList(Types.INTEGER, Types.INTEGER));
- BeamRecord record = new BeamRecord(resultType);
- record.addField("f_int2", 0);
- record.addField("squaresum", 30);
+ BeamRecord record = new BeamRecord(resultType, 0, 30);
String sql1 = "SELECT f_int2, squaresum1(f_int) AS `squaresum`"
+ " FROM PCOLLECTION GROUP BY f_int2";
@@ -72,9 +70,7 @@ public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase {
BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "cubicvalue"),
Arrays.asList(Types.INTEGER, Types.INTEGER));
- BeamRecord record = new BeamRecord(resultType);
- record.addField("f_int", 2);
- record.addField("cubicvalue", 8);
+ BeamRecord record = new BeamRecord(resultType, 2, 8);
String sql1 = "SELECT f_int, cubic1(f_int) as cubicvalue FROM PCOLLECTION WHERE f_int = 2";
PCollection<BeamRecord> result1 =
http://git-wip-us.apache.org/repos/asf/beam/blob/fa36b128/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
index 63b6ca8..e9dc88f 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
@@ -184,11 +184,7 @@ public class TestUtils {
int fieldCount = type.size();
for (int i = 0; i < args.size(); i += fieldCount) {
- BeamRecord row = new BeamRecord(type);
- for (int j = 0; j < fieldCount; j++) {
- row.addField(j, args.get(i + j));
- }
- rows.add(row);
+ rows.add(new BeamRecord(type, args.subList(i, i + fieldCount)));
}
return rows;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/fa36b128/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java
index 4da7790..86e2ca4 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java
@@ -71,12 +71,8 @@ public class BeamSqlFnExecutorTestBase {
.add("order_time", SqlTypeName.BIGINT).build();
beamRowType = CalciteUtils.toBeamRowType(relDataType);
- record = new BeamRecord(beamRowType);
-
- record.addField(0, 1234567L);
- record.addField(1, 0);
- record.addField(2, 8.9);
- record.addField(3, 1234567L);
+ record = new BeamRecord(beamRowType
+ , 1234567L, 0, 8.9, 1234567L);
SchemaPlus schema = Frameworks.createRootSchema(true);
final List<RelTraitDef> traitDefs = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/beam/blob/fa36b128/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoderTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoderTest.java
index 08f98c3..7492434 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoderTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoderTest.java
@@ -62,20 +62,12 @@ public class BeamSqlRowCoderTest {
BeamSqlRecordType beamSQLRowType = CalciteUtils.toBeamRowType(
protoRowType.apply(new JavaTypeFactoryImpl(
RelDataTypeSystem.DEFAULT)));
- BeamRecord row = new BeamRecord(beamSQLRowType);
- row.addField("col_tinyint", Byte.valueOf("1"));
- row.addField("col_smallint", Short.valueOf("1"));
- row.addField("col_integer", 1);
- row.addField("col_bigint", 1L);
- row.addField("col_float", 1.1F);
- row.addField("col_double", 1.1);
- row.addField("col_decimal", BigDecimal.ZERO);
- row.addField("col_string_varchar", "hello");
+
GregorianCalendar calendar = new GregorianCalendar();
calendar.setTime(new Date());
- row.addField("col_time", calendar);
- row.addField("col_timestamp", new Date());
- row.addField("col_boolean", true);
+ BeamRecord row = new BeamRecord(beamSQLRowType
+ , Byte.valueOf("1"), Short.valueOf("1"), 1, 1L, 1.1F, 1.1
+ , BigDecimal.ZERO, "hello", calendar, new Date(), true);
BeamRecordCoder coder = beamSQLRowType.getRecordCoder();
http://git-wip-us.apache.org/repos/asf/beam/blob/fa36b128/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTableTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTableTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTableTest.java
index 2fc013d..cb6121a 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTableTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTableTest.java
@@ -45,18 +45,14 @@ import org.junit.Test;
public class BeamKafkaCSVTableTest {
@Rule
public TestPipeline pipeline = TestPipeline.create();
- public static BeamRecord row1 = new BeamRecord(genRowType());
- public static BeamRecord row2 = new BeamRecord(genRowType());
+ public static BeamRecord row1;
+ public static BeamRecord row2;
@BeforeClass
public static void setUp() {
- row1.addField(0, 1L);
- row1.addField(1, 1);
- row1.addField(2, 1.0);
+ row1 = new BeamRecord(genRowType(), 1L, 1, 1.0);
- row2.addField(0, 2L);
- row2.addField(1, 2);
- row2.addField(2, 2.0);
+ row2 = new BeamRecord(genRowType(), 2L, 2, 2.0);
}
@Test public void testCsvRecorderDecoder() throws Exception {
[2/2] beam git commit: [BEAM-2730] This closes #3692
Posted by ta...@apache.org.
[BEAM-2730] This closes #3692
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/66286749
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/66286749
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/66286749
Branch: refs/heads/DSL_SQL
Commit: 66286749f2d64eb3f25d99b4d9bf56c49d9f62b6
Parents: 926c70a fa36b12
Author: Tyler Akidau <ta...@apache.org>
Authored: Tue Aug 8 19:07:56 2017 -0700
Committer: Tyler Akidau <ta...@apache.org>
Committed: Tue Aug 8 19:07:56 2017 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/coders/BeamRecordCoder.java | 10 +-
.../org/apache/beam/sdk/values/BeamRecord.java | 24 ++---
.../extensions/sql/example/BeamSqlExample.java | 5 +-
.../extensions/sql/impl/rel/BeamJoinRel.java | 7 +-
.../extensions/sql/impl/rel/BeamValuesRel.java | 7 +-
.../transform/BeamAggregationTransforms.java | 26 +++--
.../sql/impl/transform/BeamJoinTransforms.java | 20 ++--
.../sql/impl/transform/BeamSqlProjectFn.java | 9 +-
.../sql/schema/BeamSqlRecordType.java | 5 +-
.../extensions/sql/schema/BeamTableUtils.java | 41 +++----
.../sql/BeamSqlDslAggregationTest.java | 107 ++++---------------
.../beam/sdk/extensions/sql/BeamSqlDslBase.java | 56 +++-------
.../extensions/sql/BeamSqlDslProjectTest.java | 48 ++++-----
.../extensions/sql/BeamSqlDslUdfUdafTest.java | 8 +-
.../beam/sdk/extensions/sql/TestUtils.java | 6 +-
.../interpreter/BeamSqlFnExecutorTestBase.java | 8 +-
.../sql/schema/BeamSqlRowCoderTest.java | 16 +--
.../sql/schema/kafka/BeamKafkaCSVTableTest.java | 12 +--
18 files changed, 132 insertions(+), 283 deletions(-)
----------------------------------------------------------------------