You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/08/09 02:08:58 UTC

[1/2] beam git commit: [BEAM-2730] make BeamRecord an immutable type

Repository: beam
Updated Branches:
  refs/heads/DSL_SQL 926c70a34 -> 66286749f


[BEAM-2730] make BeamRecord an immutable type


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fa36b128
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fa36b128
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fa36b128

Branch: refs/heads/DSL_SQL
Commit: fa36b1285a4d8fc967b6302456d0f7fcf7943894
Parents: 926c70a
Author: mingmxu <mi...@ebay.com>
Authored: Mon Aug 7 16:12:21 2017 -0700
Committer: Tyler Akidau <ta...@apache.org>
Committed: Tue Aug 8 19:07:03 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/coders/BeamRecordCoder.java |  10 +-
 .../org/apache/beam/sdk/values/BeamRecord.java  |  24 ++---
 .../extensions/sql/example/BeamSqlExample.java  |   5 +-
 .../extensions/sql/impl/rel/BeamJoinRel.java    |   7 +-
 .../extensions/sql/impl/rel/BeamValuesRel.java  |   7 +-
 .../transform/BeamAggregationTransforms.java    |  26 +++--
 .../sql/impl/transform/BeamJoinTransforms.java  |  20 ++--
 .../sql/impl/transform/BeamSqlProjectFn.java    |   9 +-
 .../sql/schema/BeamSqlRecordType.java           |   5 +-
 .../extensions/sql/schema/BeamTableUtils.java   |  41 +++----
 .../sql/BeamSqlDslAggregationTest.java          | 107 ++++---------------
 .../beam/sdk/extensions/sql/BeamSqlDslBase.java |  56 +++-------
 .../extensions/sql/BeamSqlDslProjectTest.java   |  48 ++++-----
 .../extensions/sql/BeamSqlDslUdfUdafTest.java   |   8 +-
 .../beam/sdk/extensions/sql/TestUtils.java      |   6 +-
 .../interpreter/BeamSqlFnExecutorTestBase.java  |   8 +-
 .../sql/schema/BeamSqlRowCoderTest.java         |  16 +--
 .../sql/schema/kafka/BeamKafkaCSVTableTest.java |  12 +--
 18 files changed, 132 insertions(+), 283 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/fa36b128/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java
index a6200f6..4e24b82 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.coders;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.List;
 import org.apache.beam.sdk.annotations.Experimental;
@@ -69,14 +70,15 @@ public class BeamRecordCoder extends CustomCoder<BeamRecord> {
   public BeamRecord decode(InputStream inStream) throws CoderException, IOException {
     BitSet nullFields = nullListCoder.decode(inStream);
 
-    BeamRecord record = new BeamRecord(recordType);
+    List<Object> fieldValues = new ArrayList<>(recordType.size());
     for (int idx = 0; idx < recordType.size(); ++idx) {
       if (nullFields.get(idx)) {
-        continue;
+        fieldValues.add(null);
+      } else {
+        fieldValues.add(coderArray.get(idx).decode(inStream));
       }
-
-      record.addField(idx, coderArray.get(idx).decode(inStream));
     }
+    BeamRecord record = new BeamRecord(recordType, fieldValues);
 
     return record;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/fa36b128/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
index 35a96f6..6e4bd4c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.values;
 import java.io.Serializable;
 import java.math.BigDecimal;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Date;
 import java.util.GregorianCalendar;
 import java.util.List;
@@ -32,29 +33,28 @@ import org.apache.beam.sdk.annotations.Experimental;
  */
 @Experimental
 public class BeamRecord implements Serializable {
+  //immutable list of field values.
   private List<Object> dataValues;
   private BeamRecordType dataType;
 
-  public BeamRecord(BeamRecordType dataType) {
+  public BeamRecord(BeamRecordType dataType, List<Object> rawdataValues) {
     this.dataType = dataType;
-    this.dataValues = new ArrayList<>();
+    this.dataValues = new ArrayList<>(dataType.size());
+
     for (int idx = 0; idx < dataType.size(); ++idx) {
       dataValues.add(null);
     }
-  }
 
-  public BeamRecord(BeamRecordType dataType, List<Object> dataValues) {
-    this(dataType);
-    for (int idx = 0; idx < dataValues.size(); ++idx) {
-      addField(idx, dataValues.get(idx));
+    for (int idx = 0; idx < dataType.size(); ++idx) {
+      addField(idx, rawdataValues.get(idx));
     }
   }
 
-  public void addField(String fieldName, Object fieldValue) {
-    addField(dataType.getFieldsName().indexOf(fieldName), fieldValue);
+  public BeamRecord(BeamRecordType dataType, Object... rawdataValues) {
+    this(dataType, Arrays.asList(rawdataValues));
   }
 
-  public void addField(int index, Object fieldValue) {
+  private void addField(int index, Object fieldValue) {
     dataType.validateValueType(index, fieldValue);
     dataValues.set(index, fieldValue);
   }
@@ -163,10 +163,6 @@ public class BeamRecord implements Serializable {
     return dataValues;
   }
 
-  public void setDataValues(List<Object> dataValues) {
-    this.dataValues = dataValues;
-  }
-
   public BeamRecordType getDataType() {
     return dataType;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/fa36b128/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
index fbc1fd8..acb5943 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
@@ -54,10 +54,7 @@ class BeamSqlExample {
     List<String> fieldNames = Arrays.asList("c1", "c2", "c3");
     List<Integer> fieldTypes = Arrays.asList(Types.INTEGER, Types.VARCHAR, Types.DOUBLE);
     BeamSqlRecordType type = BeamSqlRecordType.create(fieldNames, fieldTypes);
-    BeamRecord row = new BeamRecord(type);
-    row.addField(0, 1);
-    row.addField(1, "row");
-    row.addField(2, 1.0);
+    BeamRecord row = new BeamRecord(type, 1, "row", 1.0);
 
     //create a source PCollection with Create.of();
     PCollection<BeamRecord> inputTable = PBegin.in(p).apply(Create.of(row)

http://git-wip-us.apache.org/repos/asf/beam/blob/fa36b128/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
index 9e5ce2f..2bd15b3 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
@@ -19,6 +19,7 @@
 package org.apache.beam.sdk.extensions.sql.impl.rel;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -255,11 +256,7 @@ public class BeamJoinRel extends Join implements BeamRelNode {
 
   private BeamRecord buildNullRow(BeamRelNode relNode) {
     BeamSqlRecordType leftType = CalciteUtils.toBeamRowType(relNode.getRowType());
-    BeamRecord nullRow = new BeamRecord(leftType);
-    for (int i = 0; i < leftType.size(); i++) {
-      nullRow.addField(i, null);
-    }
-    return nullRow;
+    return new BeamRecord(leftType, Collections.nCopies(leftType.size(), null));
   }
 
   private List<Pair<Integer, Integer>> extractJoinColumns(int leftRowColumnCount) {

http://git-wip-us.apache.org/repos/asf/beam/blob/fa36b128/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
index 8ad6e8d..1d666ca 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
@@ -65,11 +65,12 @@ public class BeamValuesRel extends Values implements BeamRelNode {
 
     BeamSqlRecordType beamSQLRowType = CalciteUtils.toBeamRowType(this.getRowType());
     for (ImmutableList<RexLiteral> tuple : tuples) {
-      BeamRecord row = new BeamRecord(beamSQLRowType);
+      List<Object> fieldsValue = new ArrayList<>(beamSQLRowType.size());
       for (int i = 0; i < tuple.size(); i++) {
-        BeamTableUtils.addFieldWithAutoTypeCasting(row, i, tuple.get(i).getValue());
+        fieldsValue.add(BeamTableUtils.autoCastField(
+            beamSQLRowType.getFieldsType().get(i), tuple.get(i).getValue()));
       }
-      rows.add(row);
+      rows.add(new BeamRecord(beamSQLRowType, fieldsValue));
     }
 
     return inputPCollections.getPipeline().apply(stageName, Create.of(rows))

http://git-wip-us.apache.org/repos/asf/beam/blob/fa36b128/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java
index ce5444f..c6a5d26 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java
@@ -75,19 +75,15 @@ public class BeamAggregationTransforms implements Serializable{
 
     @ProcessElement
     public void processElement(ProcessContext c, BoundedWindow window) {
-      BeamRecord outRecord = new BeamRecord(outRowType);
-
+      List<Object> fieldValues = new ArrayList<>();
       KV<BeamRecord, BeamRecord> kvRecord = c.element();
-      for (String f : kvRecord.getKey().getDataType().getFieldsName()) {
-        outRecord.addField(f, kvRecord.getKey().getFieldValue(f));
-      }
-      for (int idx = 0; idx < aggFieldNames.size(); ++idx) {
-        outRecord.addField(aggFieldNames.get(idx), kvRecord.getValue().getFieldValue(idx));
-      }
+      fieldValues.addAll(kvRecord.getKey().getDataValues());
+      fieldValues.addAll(kvRecord.getValue().getDataValues());
       if (windowStartFieldIdx != -1) {
-        outRecord.addField(windowStartFieldIdx, ((IntervalWindow) window).start().toDate());
+        fieldValues.add(windowStartFieldIdx, ((IntervalWindow) window).start().toDate());
       }
 
+      BeamRecord outRecord = new BeamRecord(outRowType, fieldValues);
       c.output(outRecord);
     }
   }
@@ -111,11 +107,13 @@ public class BeamAggregationTransforms implements Serializable{
     @Override
     public BeamRecord apply(BeamRecord input) {
       BeamSqlRecordType typeOfKey = exTypeOfKeyRecord(BeamSqlRecordHelper.getSqlRecordType(input));
-      BeamRecord keyOfRecord = new BeamRecord(typeOfKey);
 
+      List<Object> fieldValues = new ArrayList<>(groupByKeys.size());
       for (int idx = 0; idx < groupByKeys.size(); ++idx) {
-        keyOfRecord.addField(idx, input.getFieldValue(groupByKeys.get(idx)));
+        fieldValues.add(input.getFieldValue(groupByKeys.get(idx)));
       }
+
+      BeamRecord keyOfRecord = new BeamRecord(typeOfKey, fieldValues);
       return keyOfRecord;
     }
 
@@ -241,11 +239,11 @@ public class BeamAggregationTransforms implements Serializable{
     }
     @Override
     public BeamRecord extractOutput(AggregationAccumulator accumulator) {
-      BeamRecord result = new BeamRecord(finalRowType);
+      List<Object> fieldValues = new ArrayList<>(aggregators.size());
       for (int idx = 0; idx < aggregators.size(); ++idx) {
-        result.addField(idx, aggregators.get(idx).result(accumulator.accumulatorElements.get(idx)));
+        fieldValues.add(aggregators.get(idx).result(accumulator.accumulatorElements.get(idx)));
       }
-      return result;
+      return new BeamRecord(finalRowType, fieldValues);
     }
     @Override
     public Coder<AggregationAccumulator> getAccumulatorCoder(

http://git-wip-us.apache.org/repos/asf/beam/blob/fa36b128/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
index 105bbf3..8f34704 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
@@ -66,12 +66,12 @@ public class BeamJoinTransforms {
       BeamSqlRecordType type = BeamSqlRecordType.create(names, types);
 
       // build the row
-      BeamRecord row = new BeamRecord(type);
+      List<Object> fieldValues = new ArrayList<>(joinColumns.size());
       for (int i = 0; i < joinColumns.size(); i++) {
-        row.addField(i, input
+        fieldValues.add(input
             .getFieldValue(isLeft ? joinColumns.get(i).getKey() : joinColumns.get(i).getValue()));
       }
-      return KV.of(row, input);
+      return KV.of(new BeamRecord(type, fieldValues), input);
     }
   }
 
@@ -154,16 +154,8 @@ public class BeamJoinTransforms {
     types.addAll(BeamSqlRecordHelper.getSqlRecordType(rightRow).getFieldsType());
     BeamSqlRecordType type = BeamSqlRecordType.create(names, types);
 
-    BeamRecord row = new BeamRecord(type);
-    // build the row
-    for (int i = 0; i < leftRow.size(); i++) {
-      row.addField(i, leftRow.getFieldValue(i));
-    }
-
-    for (int i = 0; i < rightRow.size(); i++) {
-      row.addField(i + leftRow.size(), rightRow.getFieldValue(i));
-    }
-
-    return row;
+    List<Object> fieldValues = new ArrayList<>(leftRow.getDataValues());
+    fieldValues.addAll(rightRow.getDataValues());
+    return new BeamRecord(type, fieldValues);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/fa36b128/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java
index 45dc621..34d6dbb 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.extensions.sql.impl.transform;
 
+import java.util.ArrayList;
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionExecutor;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamProjectRel;
@@ -53,12 +54,12 @@ public class BeamSqlProjectFn extends DoFn<BeamRecord, BeamRecord> {
   public void processElement(ProcessContext c, BoundedWindow window) {
     BeamRecord inputRow = c.element();
     List<Object> results = executor.execute(inputRow, window);
-
-    BeamRecord outRow = new BeamRecord(outputRowType);
-
+    List<Object> fieldsValue = new ArrayList<>(results.size());
     for (int idx = 0; idx < results.size(); ++idx) {
-      BeamTableUtils.addFieldWithAutoTypeCasting(outRow, idx, results.get(idx));
+      fieldsValue.add(
+          BeamTableUtils.autoCastField(outputRowType.getFieldsType().get(idx), results.get(idx)));
     }
+    BeamRecord outRow = new BeamRecord(outputRowType, fieldsValue);
 
     c.output(outRow);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/fa36b128/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordType.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordType.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordType.java
index fe82834..b7c7438 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordType.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordType.java
@@ -84,7 +84,10 @@ public class BeamSqlRecordType extends BeamRecordType {
 
   public static BeamSqlRecordType create(List<String> fieldNames,
       List<Integer> fieldTypes) {
-    List<Coder> fieldCoders = new ArrayList<>();
+    if (fieldNames.size() != fieldTypes.size()) {
+      throw new IllegalStateException("the sizes of 'dataType' and 'fieldTypes' must match.");
+    }
+    List<Coder> fieldCoders = new ArrayList<>(fieldTypes.size());
     for (int idx = 0; idx < fieldTypes.size(); ++idx) {
       switch (fieldTypes.get(idx)) {
       case Types.INTEGER:

http://git-wip-us.apache.org/repos/asf/beam/blob/fa36b128/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java
index 63c9720..19d3e39 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java
@@ -22,6 +22,8 @@ import java.io.IOException;
 import java.io.StringReader;
 import java.io.StringWriter;
 import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
@@ -39,7 +41,7 @@ public final class BeamTableUtils {
       CSVFormat csvFormat,
       String line,
       BeamSqlRecordType beamSqlRowType) {
-    BeamRecord row = new BeamRecord(beamSqlRowType);
+    List<Object> fieldsValue = new ArrayList<>(beamSqlRowType.size());
     try (StringReader reader = new StringReader(line)) {
       CSVParser parser = csvFormat.parse(reader);
       CSVRecord rawRecord = parser.getRecords().get(0);
@@ -52,13 +54,13 @@ public final class BeamTableUtils {
       } else {
         for (int idx = 0; idx < beamSqlRowType.size(); idx++) {
           String raw = rawRecord.get(idx);
-          addFieldWithAutoTypeCasting(row, idx, raw);
+          fieldsValue.add(autoCastField(beamSqlRowType.getFieldsType().get(idx), raw));
         }
       }
     } catch (IOException e) {
       throw new IllegalArgumentException("decodeRecord failed!", e);
     }
-    return row;
+    return new BeamRecord(beamSqlRowType, fieldsValue);
   }
 
   public static String beamSqlRow2CsvLine(BeamRecord row, CSVFormat csvFormat) {
@@ -74,37 +76,29 @@ public final class BeamTableUtils {
     return writer.toString();
   }
 
-  public static void addFieldWithAutoTypeCasting(BeamRecord row, int idx, Object rawObj) {
+  public static Object autoCastField(int fieldType, Object rawObj) {
     if (rawObj == null) {
-      row.addField(idx, null);
-      return;
+      return null;
     }
 
-    SqlTypeName columnType = CalciteUtils.getFieldType(BeamSqlRecordHelper.getSqlRecordType(row)
-        , idx);
+    SqlTypeName columnType = CalciteUtils.toCalciteType(fieldType);
     // auto-casting for numberics
     if ((rawObj instanceof String && SqlTypeName.NUMERIC_TYPES.contains(columnType))
         || (rawObj instanceof BigDecimal && columnType != SqlTypeName.DECIMAL)) {
       String raw = rawObj.toString();
       switch (columnType) {
         case TINYINT:
-          row.addField(idx, Byte.valueOf(raw));
-          break;
+          return Byte.valueOf(raw);
         case SMALLINT:
-          row.addField(idx, Short.valueOf(raw));
-          break;
+          return Short.valueOf(raw);
         case INTEGER:
-          row.addField(idx, Integer.valueOf(raw));
-          break;
+          return Integer.valueOf(raw);
         case BIGINT:
-          row.addField(idx, Long.valueOf(raw));
-          break;
+          return Long.valueOf(raw);
         case FLOAT:
-          row.addField(idx, Float.valueOf(raw));
-          break;
+          return Float.valueOf(raw);
         case DOUBLE:
-          row.addField(idx, Double.valueOf(raw));
-          break;
+          return Double.valueOf(raw);
         default:
           throw new UnsupportedOperationException(
               String.format("Column type %s is not supported yet!", columnType));
@@ -112,13 +106,12 @@ public final class BeamTableUtils {
     } else if (SqlTypeName.CHAR_TYPES.contains(columnType)) {
       // convert NlsString to String
       if (rawObj instanceof NlsString) {
-        row.addField(idx, ((NlsString) rawObj).getValue());
+        return ((NlsString) rawObj).getValue();
       } else {
-        row.addField(idx, rawObj);
+        return rawObj;
       }
     } else {
-      // keep the origin
-      row.addField(idx, rawObj);
+      return rawObj;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/fa36b128/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
index 71278ec..19ca398 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
@@ -57,9 +57,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
     BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int2", "size"),
         Arrays.asList(Types.INTEGER, Types.BIGINT));
 
-    BeamRecord record = new BeamRecord(resultType);
-    record.addField("f_int2", 0);
-    record.addField("size", 4L);
+    BeamRecord record = new BeamRecord(resultType, 0, 4L);
 
     PAssert.that(result).containsInAnyOrder(record);
 
@@ -107,37 +105,14 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
             Types.FLOAT, Types.FLOAT, Types.DOUBLE, Types.DOUBLE, Types.DOUBLE, Types.DOUBLE,
             Types.TIMESTAMP, Types.TIMESTAMP));
 
-    BeamRecord record = new BeamRecord(resultType);
-    record.addField("f_int2", 0);
-    record.addField("size", 4L);
-
-    record.addField("sum1", 10000L);
-    record.addField("avg1", 2500L);
-    record.addField("max1", 4000L);
-    record.addField("min1", 1000L);
-
-    record.addField("sum2", (short) 10);
-    record.addField("avg2", (short) 2);
-    record.addField("max2", (short) 4);
-    record.addField("min2", (short) 1);
-
-    record.addField("sum3", (byte) 10);
-    record.addField("avg3", (byte) 2);
-    record.addField("max3", (byte) 4);
-    record.addField("min3", (byte) 1);
-
-    record.addField("sum4", 10.0F);
-    record.addField("avg4", 2.5F);
-    record.addField("max4", 4.0F);
-    record.addField("min4", 1.0F);
-
-    record.addField("sum5", 10.0);
-    record.addField("avg5", 2.5);
-    record.addField("max5", 4.0);
-    record.addField("min5", 1.0);
-
-    record.addField("max6", FORMAT.parse("2017-01-01 02:04:03"));
-    record.addField("min6", FORMAT.parse("2017-01-01 01:01:03"));
+    BeamRecord record = new BeamRecord(resultType
+        , 0, 4L
+        , 10000L, 2500L, 4000L, 1000L
+        , (short) 10, (short) 2, (short) 4, (short) 1
+        , (byte) 10, (byte) 2, (byte) 4, (byte) 1
+        , 10.0F, 2.5F, 4.0F, 1.0F
+        , 10.0, 2.5, 4.0, 1.0
+        , FORMAT.parse("2017-01-01 02:04:03"), FORMAT.parse("2017-01-01 01:01:03"));
 
     PAssert.that(result).containsInAnyOrder(record);
 
@@ -169,21 +144,10 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
     BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"),
         Arrays.asList(Types.INTEGER, Types.BIGINT));
 
-    BeamRecord record1 = new BeamRecord(resultType);
-    record1.addField("f_int", 1);
-    record1.addField("f_long", 1000L);
-
-    BeamRecord record2 = new BeamRecord(resultType);
-    record2.addField("f_int", 2);
-    record2.addField("f_long", 2000L);
-
-    BeamRecord record3 = new BeamRecord(resultType);
-    record3.addField("f_int", 3);
-    record3.addField("f_long", 3000L);
-
-    BeamRecord record4 = new BeamRecord(resultType);
-    record4.addField("f_int", 4);
-    record4.addField("f_long", 4000L);
+    BeamRecord record1 = new BeamRecord(resultType, 1, 1000L);
+    BeamRecord record2 = new BeamRecord(resultType, 2, 2000L);
+    BeamRecord record3 = new BeamRecord(resultType, 3, 3000L);
+    BeamRecord record4 = new BeamRecord(resultType, 4, 4000L);
 
     PAssert.that(result).containsInAnyOrder(record1, record2, record3, record4);
 
@@ -219,15 +183,8 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
         Arrays.asList("f_int2", "size", "window_start"),
         Arrays.asList(Types.INTEGER, Types.BIGINT, Types.TIMESTAMP));
 
-    BeamRecord record1 = new BeamRecord(resultType);
-    record1.addField("f_int2", 0);
-    record1.addField("size", 3L);
-    record1.addField("window_start", FORMAT.parse("2017-01-01 01:00:00"));
-
-    BeamRecord record2 = new BeamRecord(resultType);
-    record2.addField("f_int2", 0);
-    record2.addField("size", 1L);
-    record2.addField("window_start", FORMAT.parse("2017-01-01 02:00:00"));
+    BeamRecord record1 = new BeamRecord(resultType, 0, 3L, FORMAT.parse("2017-01-01 01:00:00"));
+    BeamRecord record2 = new BeamRecord(resultType, 0, 1L, FORMAT.parse("2017-01-01 02:00:00"));
 
     PAssert.that(result).containsInAnyOrder(record1, record2);
 
@@ -262,25 +219,10 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
         Arrays.asList("f_int2", "size", "window_start"),
         Arrays.asList(Types.INTEGER, Types.BIGINT, Types.TIMESTAMP));
 
-    BeamRecord record1 = new BeamRecord(resultType);
-    record1.addField("f_int2", 0);
-    record1.addField("size", 3L);
-    record1.addField("window_start", FORMAT.parse("2017-01-01 00:30:00"));
-
-    BeamRecord record2 = new BeamRecord(resultType);
-    record2.addField("f_int2", 0);
-    record2.addField("size", 3L);
-    record2.addField("window_start", FORMAT.parse("2017-01-01 01:00:00"));
-
-    BeamRecord record3 = new BeamRecord(resultType);
-    record3.addField("f_int2", 0);
-    record3.addField("size", 1L);
-    record3.addField("window_start", FORMAT.parse("2017-01-01 01:30:00"));
-
-    BeamRecord record4 = new BeamRecord(resultType);
-    record4.addField("f_int2", 0);
-    record4.addField("size", 1L);
-    record4.addField("window_start", FORMAT.parse("2017-01-01 02:00:00"));
+    BeamRecord record1 = new BeamRecord(resultType, 0, 3L, FORMAT.parse("2017-01-01 00:30:00"));
+    BeamRecord record2 = new BeamRecord(resultType, 0, 3L, FORMAT.parse("2017-01-01 01:00:00"));
+    BeamRecord record3 = new BeamRecord(resultType, 0, 1L, FORMAT.parse("2017-01-01 01:30:00"));
+    BeamRecord record4 = new BeamRecord(resultType, 0, 1L, FORMAT.parse("2017-01-01 02:00:00"));
 
     PAssert.that(result).containsInAnyOrder(record1, record2, record3, record4);
 
@@ -316,15 +258,8 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
         Arrays.asList("f_int2", "size", "window_start"),
         Arrays.asList(Types.INTEGER, Types.BIGINT, Types.TIMESTAMP));
 
-    BeamRecord record1 = new BeamRecord(resultType);
-    record1.addField("f_int2", 0);
-    record1.addField("size", 3L);
-    record1.addField("window_start", FORMAT.parse("2017-01-01 01:01:03"));
-
-    BeamRecord record2 = new BeamRecord(resultType);
-    record2.addField("f_int2", 0);
-    record2.addField("size", 1L);
-    record2.addField("window_start", FORMAT.parse("2017-01-01 02:04:03"));
+    BeamRecord record1 = new BeamRecord(resultType, 0, 3L, FORMAT.parse("2017-01-01 01:01:03"));
+    BeamRecord record2 = new BeamRecord(resultType, 0, 1L, FORMAT.parse("2017-01-01 02:04:03"));
 
     PAssert.that(result).containsInAnyOrder(record1, record2);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/fa36b128/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java
index d09caf0..02427ae 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java
@@ -112,56 +112,24 @@ public class BeamSqlDslBase {
   private static List<BeamRecord> prepareInputRowsInTableA() throws ParseException{
     List<BeamRecord> rows = new ArrayList<>();
 
-    BeamRecord row1 = new BeamRecord(rowTypeInTableA);
-    row1.addField(0, 1);
-    row1.addField(1, 1000L);
-    row1.addField(2, Short.valueOf("1"));
-    row1.addField(3, Byte.valueOf("1"));
-    row1.addField(4, 1.0f);
-    row1.addField(5, 1.0);
-    row1.addField(6, "string_row1");
-    row1.addField(7, FORMAT.parse("2017-01-01 01:01:03"));
-    row1.addField(8, 0);
-    row1.addField(9, new BigDecimal(1));
+    BeamRecord row1 = new BeamRecord(rowTypeInTableA
+        , 1, 1000L, Short.valueOf("1"), Byte.valueOf("1"), 1.0f, 1.0, "string_row1"
+        , FORMAT.parse("2017-01-01 01:01:03"), 0, new BigDecimal(1));
     rows.add(row1);
 
-    BeamRecord row2 = new BeamRecord(rowTypeInTableA);
-    row2.addField(0, 2);
-    row2.addField(1, 2000L);
-    row2.addField(2, Short.valueOf("2"));
-    row2.addField(3, Byte.valueOf("2"));
-    row2.addField(4, 2.0f);
-    row2.addField(5, 2.0);
-    row2.addField(6, "string_row2");
-    row2.addField(7, FORMAT.parse("2017-01-01 01:02:03"));
-    row2.addField(8, 0);
-    row2.addField(9, new BigDecimal(2));
+    BeamRecord row2 = new BeamRecord(rowTypeInTableA
+        , 2, 2000L, Short.valueOf("2"), Byte.valueOf("2"), 2.0f, 2.0, "string_row2"
+        , FORMAT.parse("2017-01-01 01:02:03"), 0, new BigDecimal(2));
     rows.add(row2);
 
-    BeamRecord row3 = new BeamRecord(rowTypeInTableA);
-    row3.addField(0, 3);
-    row3.addField(1, 3000L);
-    row3.addField(2, Short.valueOf("3"));
-    row3.addField(3, Byte.valueOf("3"));
-    row3.addField(4, 3.0f);
-    row3.addField(5, 3.0);
-    row3.addField(6, "string_row3");
-    row3.addField(7, FORMAT.parse("2017-01-01 01:06:03"));
-    row3.addField(8, 0);
-    row3.addField(9, new BigDecimal(3));
+    BeamRecord row3 = new BeamRecord(rowTypeInTableA
+        , 3, 3000L, Short.valueOf("3"), Byte.valueOf("3"), 3.0f, 3.0, "string_row3"
+        , FORMAT.parse("2017-01-01 01:06:03"), 0, new BigDecimal(3));
     rows.add(row3);
 
-    BeamRecord row4 = new BeamRecord(rowTypeInTableA);
-    row4.addField(0, 4);
-    row4.addField(1, 4000L);
-    row4.addField(2, Short.valueOf("4"));
-    row4.addField(3, Byte.valueOf("4"));
-    row4.addField(4, 4.0f);
-    row4.addField(5, 4.0);
-    row4.addField(6, "string_row4");
-    row4.addField(7, FORMAT.parse("2017-01-01 02:04:03"));
-    row4.addField(8, 0);
-    row4.addField(9, new BigDecimal(4));
+    BeamRecord row4 = new BeamRecord(rowTypeInTableA
+        , 4, 4000L, Short.valueOf("4"), Byte.valueOf("4"), 4.0f, 4.0, "string_row4"
+        , FORMAT.parse("2017-01-01 02:04:03"), 0, new BigDecimal(4));
     rows.add(row4);
 
     return rows;

http://git-wip-us.apache.org/repos/asf/beam/blob/fa36b128/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java
index ddb90d5..c8041a8 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java
@@ -84,9 +84,8 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
     BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"),
         Arrays.asList(Types.INTEGER, Types.BIGINT));
 
-    BeamRecord record = new BeamRecord(resultType);
-    record.addField("f_int", recordsInTableA.get(0).getFieldValue(0));
-    record.addField("f_long", recordsInTableA.get(0).getFieldValue(1));
+    BeamRecord record = new BeamRecord(resultType
+        , recordsInTableA.get(0).getFieldValue(0), recordsInTableA.get(0).getFieldValue(1));
 
     PAssert.that(result).containsInAnyOrder(record);
 
@@ -119,21 +118,17 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
     BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"),
         Arrays.asList(Types.INTEGER, Types.BIGINT));
 
-    BeamRecord record1 = new BeamRecord(resultType);
-    record1.addField("f_int", recordsInTableA.get(0).getFieldValue(0));
-    record1.addField("f_long", recordsInTableA.get(0).getFieldValue(1));
+    BeamRecord record1 = new BeamRecord(resultType
+        , recordsInTableA.get(0).getFieldValue(0), recordsInTableA.get(0).getFieldValue(1));
 
-    BeamRecord record2 = new BeamRecord(resultType);
-    record2.addField("f_int", recordsInTableA.get(1).getFieldValue(0));
-    record2.addField("f_long", recordsInTableA.get(1).getFieldValue(1));
+    BeamRecord record2 = new BeamRecord(resultType
+        , recordsInTableA.get(1).getFieldValue(0), recordsInTableA.get(1).getFieldValue(1));
 
-    BeamRecord record3 = new BeamRecord(resultType);
-    record3.addField("f_int", recordsInTableA.get(2).getFieldValue(0));
-    record3.addField("f_long", recordsInTableA.get(2).getFieldValue(1));
+    BeamRecord record3 = new BeamRecord(resultType
+        , recordsInTableA.get(2).getFieldValue(0), recordsInTableA.get(2).getFieldValue(1));
 
-    BeamRecord record4 = new BeamRecord(resultType);
-    record4.addField("f_int", recordsInTableA.get(3).getFieldValue(0));
-    record4.addField("f_long", recordsInTableA.get(3).getFieldValue(1));
+    BeamRecord record4 = new BeamRecord(resultType
+        , recordsInTableA.get(3).getFieldValue(0), recordsInTableA.get(3).getFieldValue(1));
 
     PAssert.that(result).containsInAnyOrder(record1, record2, record3, record4);
 
@@ -166,21 +161,17 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
     BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"),
         Arrays.asList(Types.INTEGER, Types.BIGINT));
 
-    BeamRecord record1 = new BeamRecord(resultType);
-    record1.addField("f_int", recordsInTableA.get(0).getFieldValue(0));
-    record1.addField("f_long", recordsInTableA.get(0).getFieldValue(1));
+    BeamRecord record1 = new BeamRecord(resultType
+        , recordsInTableA.get(0).getFieldValue(0), recordsInTableA.get(0).getFieldValue(1));
 
-    BeamRecord record2 = new BeamRecord(resultType);
-    record2.addField("f_int", recordsInTableA.get(1).getFieldValue(0));
-    record2.addField("f_long", recordsInTableA.get(1).getFieldValue(1));
+    BeamRecord record2 = new BeamRecord(resultType
+        , recordsInTableA.get(1).getFieldValue(0), recordsInTableA.get(1).getFieldValue(1));
 
-    BeamRecord record3 = new BeamRecord(resultType);
-    record3.addField("f_int", recordsInTableA.get(2).getFieldValue(0));
-    record3.addField("f_long", recordsInTableA.get(2).getFieldValue(1));
+    BeamRecord record3 = new BeamRecord(resultType
+        , recordsInTableA.get(2).getFieldValue(0), recordsInTableA.get(2).getFieldValue(1));
 
-    BeamRecord record4 = new BeamRecord(resultType);
-    record4.addField("f_int", recordsInTableA.get(3).getFieldValue(0));
-    record4.addField("f_long", recordsInTableA.get(3).getFieldValue(1));
+    BeamRecord record4 = new BeamRecord(resultType
+        , recordsInTableA.get(3).getFieldValue(0), recordsInTableA.get(3).getFieldValue(1));
 
     PAssert.that(result).containsInAnyOrder(record1, record2, record3, record4);
 
@@ -213,8 +204,7 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
     BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("literal_field"),
         Arrays.asList(Types.INTEGER));
 
-    BeamRecord record = new BeamRecord(resultType);
-    record.addField("literal_field", 1);
+    BeamRecord record = new BeamRecord(resultType, 1);
 
     PAssert.that(result).containsInAnyOrder(record);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/fa36b128/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
index e3c6aec..25e76e9 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
@@ -42,9 +42,7 @@ public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase {
     BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int2", "squaresum"),
         Arrays.asList(Types.INTEGER, Types.INTEGER));
 
-    BeamRecord record = new BeamRecord(resultType);
-    record.addField("f_int2", 0);
-    record.addField("squaresum", 30);
+    BeamRecord record = new BeamRecord(resultType, 0, 30);
 
     String sql1 = "SELECT f_int2, squaresum1(f_int) AS `squaresum`"
         + " FROM PCOLLECTION GROUP BY f_int2";
@@ -72,9 +70,7 @@ public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase {
     BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "cubicvalue"),
         Arrays.asList(Types.INTEGER, Types.INTEGER));
 
-    BeamRecord record = new BeamRecord(resultType);
-    record.addField("f_int", 2);
-    record.addField("cubicvalue", 8);
+    BeamRecord record = new BeamRecord(resultType, 2, 8);
 
     String sql1 = "SELECT f_int, cubic1(f_int) as cubicvalue FROM PCOLLECTION WHERE f_int = 2";
     PCollection<BeamRecord> result1 =

http://git-wip-us.apache.org/repos/asf/beam/blob/fa36b128/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
index 63b6ca8..e9dc88f 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
@@ -184,11 +184,7 @@ public class TestUtils {
     int fieldCount = type.size();
 
     for (int i = 0; i < args.size(); i += fieldCount) {
-      BeamRecord row = new BeamRecord(type);
-      for (int j = 0; j < fieldCount; j++) {
-        row.addField(j, args.get(i + j));
-      }
-      rows.add(row);
+      rows.add(new BeamRecord(type, args.subList(i, i + fieldCount)));
     }
     return rows;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/fa36b128/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java
index 4da7790..86e2ca4 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java
@@ -71,12 +71,8 @@ public class BeamSqlFnExecutorTestBase {
         .add("order_time", SqlTypeName.BIGINT).build();
 
     beamRowType = CalciteUtils.toBeamRowType(relDataType);
-    record = new BeamRecord(beamRowType);
-
-    record.addField(0, 1234567L);
-    record.addField(1, 0);
-    record.addField(2, 8.9);
-    record.addField(3, 1234567L);
+    record = new BeamRecord(beamRowType
+        , 1234567L, 0, 8.9, 1234567L);
 
     SchemaPlus schema = Frameworks.createRootSchema(true);
     final List<RelTraitDef> traitDefs = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/beam/blob/fa36b128/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoderTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoderTest.java
index 08f98c3..7492434 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoderTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoderTest.java
@@ -62,20 +62,12 @@ public class BeamSqlRowCoderTest {
     BeamSqlRecordType beamSQLRowType = CalciteUtils.toBeamRowType(
         protoRowType.apply(new JavaTypeFactoryImpl(
             RelDataTypeSystem.DEFAULT)));
-    BeamRecord row = new BeamRecord(beamSQLRowType);
-    row.addField("col_tinyint", Byte.valueOf("1"));
-    row.addField("col_smallint", Short.valueOf("1"));
-    row.addField("col_integer", 1);
-    row.addField("col_bigint", 1L);
-    row.addField("col_float", 1.1F);
-    row.addField("col_double", 1.1);
-    row.addField("col_decimal", BigDecimal.ZERO);
-    row.addField("col_string_varchar", "hello");
+
     GregorianCalendar calendar = new GregorianCalendar();
     calendar.setTime(new Date());
-    row.addField("col_time", calendar);
-    row.addField("col_timestamp", new Date());
-    row.addField("col_boolean", true);
+    BeamRecord row = new BeamRecord(beamSQLRowType
+        , Byte.valueOf("1"), Short.valueOf("1"), 1, 1L, 1.1F, 1.1
+        , BigDecimal.ZERO, "hello", calendar, new Date(), true);
 
 
     BeamRecordCoder coder = beamSQLRowType.getRecordCoder();

http://git-wip-us.apache.org/repos/asf/beam/blob/fa36b128/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTableTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTableTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTableTest.java
index 2fc013d..cb6121a 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTableTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTableTest.java
@@ -45,18 +45,14 @@ import org.junit.Test;
 public class BeamKafkaCSVTableTest {
   @Rule
   public TestPipeline pipeline = TestPipeline.create();
-  public static BeamRecord row1 = new BeamRecord(genRowType());
-  public static BeamRecord row2 = new BeamRecord(genRowType());
+  public static BeamRecord row1;
+  public static BeamRecord row2;
 
   @BeforeClass
   public static void setUp() {
-    row1.addField(0, 1L);
-    row1.addField(1, 1);
-    row1.addField(2, 1.0);
+    row1 = new BeamRecord(genRowType(), 1L, 1, 1.0);
 
-    row2.addField(0, 2L);
-    row2.addField(1, 2);
-    row2.addField(2, 2.0);
+    row2 = new BeamRecord(genRowType(), 2L, 2, 2.0);
   }
 
   @Test public void testCsvRecorderDecoder() throws Exception {

[2/2] beam git commit: [BEAM-2730] This closes #3692

Posted by ta...@apache.org.
[BEAM-2730] This closes #3692


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/66286749
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/66286749
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/66286749

Branch: refs/heads/DSL_SQL
Commit: 66286749f2d64eb3f25d99b4d9bf56c49d9f62b6
Parents: 926c70a fa36b12
Author: Tyler Akidau <ta...@apache.org>
Authored: Tue Aug 8 19:07:56 2017 -0700
Committer: Tyler Akidau <ta...@apache.org>
Committed: Tue Aug 8 19:07:56 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/coders/BeamRecordCoder.java |  10 +-
 .../org/apache/beam/sdk/values/BeamRecord.java  |  24 ++---
 .../extensions/sql/example/BeamSqlExample.java  |   5 +-
 .../extensions/sql/impl/rel/BeamJoinRel.java    |   7 +-
 .../extensions/sql/impl/rel/BeamValuesRel.java  |   7 +-
 .../transform/BeamAggregationTransforms.java    |  26 +++--
 .../sql/impl/transform/BeamJoinTransforms.java  |  20 ++--
 .../sql/impl/transform/BeamSqlProjectFn.java    |   9 +-
 .../sql/schema/BeamSqlRecordType.java           |   5 +-
 .../extensions/sql/schema/BeamTableUtils.java   |  41 +++----
 .../sql/BeamSqlDslAggregationTest.java          | 107 ++++---------------
 .../beam/sdk/extensions/sql/BeamSqlDslBase.java |  56 +++-------
 .../extensions/sql/BeamSqlDslProjectTest.java   |  48 ++++-----
 .../extensions/sql/BeamSqlDslUdfUdafTest.java   |   8 +-
 .../beam/sdk/extensions/sql/TestUtils.java      |   6 +-
 .../interpreter/BeamSqlFnExecutorTestBase.java  |   8 +-
 .../sql/schema/BeamSqlRowCoderTest.java         |  16 +--
 .../sql/schema/kafka/BeamKafkaCSVTableTest.java |  12 +--
 18 files changed, 132 insertions(+), 283 deletions(-)
----------------------------------------------------------------------