You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/06/16 21:21:10 UTC

[1/2] beam git commit: [BEAM-2443] apply AutoValue to BeamSqlRecordType

Repository: beam
Updated Branches:
  refs/heads/DSL_SQL abe0f1a0a -> dcd769c8a


[BEAM-2443] apply AutoValue to BeamSqlRecordType


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/20453733
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/20453733
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/20453733

Branch: refs/heads/DSL_SQL
Commit: 20453733ea8679e9fe421950a69921ace80dd381
Parents: abe0f1a
Author: James Xu <xu...@gmail.com>
Authored: Fri Jun 16 14:31:55 2017 +0800
Committer: Tyler Akidau <ta...@apache.org>
Committed: Fri Jun 16 14:19:02 2017 -0700

----------------------------------------------------------------------
 dsls/sql/pom.xml                                |  5 +++
 .../beam/dsls/sql/example/BeamSqlExample.java   |  9 ++---
 .../beam/dsls/sql/rel/BeamAggregationRel.java   | 20 ++++++-----
 .../beam/dsls/sql/schema/BeamSqlRecordType.java | 38 +++++---------------
 .../transform/BeamAggregationTransforms.java    | 22 +++++++-----
 .../beam/dsls/sql/utils/CalciteUtils.java       | 11 +++---
 6 files changed, 51 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/20453733/dsls/sql/pom.xml
----------------------------------------------------------------------
diff --git a/dsls/sql/pom.xml b/dsls/sql/pom.xml
index e70c88c..d866313 100644
--- a/dsls/sql/pom.xml
+++ b/dsls/sql/pom.xml
@@ -190,5 +190,10 @@
       <version>0.10.1.0</version>
       <scope>provided</scope>
     </dependency>
+    <dependency>
+      <groupId>com.google.auto.value</groupId>
+      <artifactId>auto-value</artifactId>
+      <scope>provided</scope>
+    </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/beam/blob/20453733/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
index 6bb1617..31f8302 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
@@ -18,6 +18,8 @@
 package org.apache.beam.dsls.sql.example;
 
 import java.sql.Types;
+import java.util.Arrays;
+import java.util.List;
 import org.apache.beam.dsls.sql.BeamSql;
 import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
@@ -47,10 +49,9 @@ class BeamSqlExample {
     Pipeline p = Pipeline.create(options);
 
     //define the input row format
-    BeamSqlRecordType type = new BeamSqlRecordType();
-    type.addField("c1", Types.INTEGER);
-    type.addField("c2", Types.VARCHAR);
-    type.addField("c3", Types.DOUBLE);
+    List<String> fieldNames = Arrays.asList("c1", "c2", "c3");
+    List<Integer> fieldTypes = Arrays.asList(Types.INTEGER, Types.VARCHAR, Types.DOUBLE);
+    BeamSqlRecordType type = BeamSqlRecordType.create(fieldNames, fieldTypes);
     BeamSqlRow row = new BeamSqlRow(type);
     row.addField(0, 1);
     row.addField(1, "row");

http://git-wip-us.apache.org/repos/asf/beam/blob/20453733/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
index 595563d..bcdc44f 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
@@ -17,8 +17,8 @@
  */
 package org.apache.beam.dsls.sql.rel;
 
+import java.util.ArrayList;
 import java.util.List;
-
 import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
@@ -125,25 +125,29 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode {
    */
   private BeamSqlRecordType exKeyFieldsSchema(RelDataType relDataType) {
     BeamSqlRecordType inputRecordType = CalciteUtils.toBeamRecordType(relDataType);
-    BeamSqlRecordType typeOfKey = new BeamSqlRecordType();
+    List<String> fieldNames = new ArrayList<>();
+    List<Integer> fieldTypes = new ArrayList<>();
     for (int i : groupSet.asList()) {
       if (i != windowFieldIdx) {
-        typeOfKey.addField(inputRecordType.getFieldsName().get(i),
-            inputRecordType.getFieldsType().get(i));
+        fieldNames.add(inputRecordType.getFieldsName().get(i));
+        fieldTypes.add(inputRecordType.getFieldsType().get(i));
       }
     }
-    return typeOfKey;
+    return BeamSqlRecordType.create(fieldNames, fieldTypes);
   }
 
   /**
    * Type of sub-rowrecord, that represents the list of aggregation fields.
    */
   private BeamSqlRecordType exAggFieldsSchema() {
-    BeamSqlRecordType typeOfAggFields = new BeamSqlRecordType();
+    List<String> fieldNames = new ArrayList<>();
+    List<Integer> fieldTypes = new ArrayList<>();
     for (AggregateCall ac : getAggCallList()) {
-      typeOfAggFields.addField(ac.name, CalciteUtils.toJavaType(ac.type.getSqlTypeName()));
+      fieldNames.add(ac.name);
+      fieldTypes.add(CalciteUtils.toJavaType(ac.type.getSqlTypeName()));
     }
-    return typeOfAggFields;
+
+    return BeamSqlRecordType.create(fieldNames, fieldTypes);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/20453733/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java
index 08ba39f..9fc3945 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java
@@ -17,46 +17,24 @@
  */
 package org.apache.beam.dsls.sql.schema;
 
+import com.google.auto.value.AutoValue;
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.List;
 
 /**
  * Field type information in {@link BeamSqlRow}.
  *
  */
-public class BeamSqlRecordType implements Serializable {
-  private List<String> fieldsName = new ArrayList<>();
-  private List<Integer> fieldsType = new ArrayList<>();
+@AutoValue
+public abstract class BeamSqlRecordType implements Serializable {
+  public abstract List<String> getFieldsName();
+  public abstract List<Integer> getFieldsType();
 
-  public void addField(String fieldName, Integer fieldType) {
-    fieldsName.add(fieldName);
-    fieldsType.add(fieldType);
+  public static BeamSqlRecordType create(List<String> fieldNames, List<Integer> fieldTypes) {
+    return new AutoValue_BeamSqlRecordType(fieldNames, fieldTypes);
   }
 
   public int size() {
-    return fieldsName.size();
+    return getFieldsName().size();
   }
-
-  public List<String> getFieldsName() {
-    return fieldsName;
-  }
-
-  public void setFieldsName(List<String> fieldsName) {
-    this.fieldsName = fieldsName;
-  }
-
-  public List<Integer> getFieldsType() {
-    return fieldsType;
-  }
-
-  public void setFieldsType(List<Integer> fieldsType) {
-    this.fieldsType = fieldsType;
-  }
-
-  @Override
-  public String toString() {
-    return "RecordType [fieldsName=" + fieldsName + ", fieldsType=" + fieldsType + "]";
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/20453733/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java
index e804b94..83d473a 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java
@@ -106,13 +106,14 @@ public class BeamAggregationTransforms implements Serializable{
     }
 
     private BeamSqlRecordType exTypeOfKeyRecord(BeamSqlRecordType dataType) {
-      BeamSqlRecordType typeOfKey = new BeamSqlRecordType();
+      List<String> fieldNames = new ArrayList<>();
+      List<Integer> fieldTypes = new ArrayList<>();
       for (int idx : groupByKeys) {
-        typeOfKey.addField(dataType.getFieldsName().get(idx), dataType.getFieldsType().get(idx));
+        fieldNames.add(dataType.getFieldsName().get(idx));
+        fieldTypes.add(dataType.getFieldsType().get(idx));
       }
-      return typeOfKey;
+      return BeamSqlRecordType.create(fieldNames, fieldTypes);
     }
-
   }
 
   /**
@@ -152,19 +153,21 @@ public class BeamAggregationTransforms implements Serializable{
 
     public AggregationCombineFn(List<AggregateCall> aggregationCalls,
         BeamSqlRecordType sourceRowRecordType) {
-      this.aggDataType = new BeamSqlRecordType();
       this.aggFunctions = new ArrayList<>();
       this.aggElementExpressions = new ArrayList<>();
 
       boolean hasAvg = false;
       boolean hasCount = false;
       int countIndex = -1;
+      List<String> fieldNames = new ArrayList<>();
+      List<Integer> fieldTypes = new ArrayList<>();
       for (int idx = 0; idx < aggregationCalls.size(); ++idx) {
         AggregateCall ac = aggregationCalls.get(idx);
         //verify it's supported.
         verifySupportedAggregation(ac);
 
-        aggDataType.addField(ac.name, CalciteUtils.toJavaType(ac.type.getSqlTypeName()));
+        fieldNames.add(ac.name);
+        fieldTypes.add(CalciteUtils.toJavaType(ac.type.getSqlTypeName()));
 
         SqlAggFunction aggFn = ac.getAggregation();
         switch (aggFn.getName()) {
@@ -190,10 +193,12 @@ public class BeamAggregationTransforms implements Serializable{
         }
         aggFunctions.add(aggFn.getName());
       }
+
+
       // add a COUNT holder if only have AVG
       if (hasAvg && !hasCount) {
-        aggDataType.addField("__COUNT",
-            CalciteUtils.toJavaType(SqlTypeName.BIGINT));
+        fieldNames.add("__COUNT");
+        fieldTypes.add(CalciteUtils.toJavaType(SqlTypeName.BIGINT));
 
         aggFunctions.add("COUNT");
         aggElementExpressions.add(BeamSqlPrimitive.<Long>of(SqlTypeName.BIGINT, 1L));
@@ -202,6 +207,7 @@ public class BeamAggregationTransforms implements Serializable{
         countIndex = aggDataType.size() - 1;
       }
 
+      this.aggDataType = BeamSqlRecordType.create(fieldNames, fieldTypes);
       this.countIndex = countIndex;
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/20453733/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java
index 46b4911..69ca44b 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java
@@ -19,7 +19,9 @@
 package org.apache.beam.dsls.sql.utils;
 
 import java.sql.Types;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.calcite.rel.type.RelDataType;
@@ -82,12 +84,13 @@ public class CalciteUtils {
    * Generate {@code BeamSqlRecordType} from {@code RelDataType} which is used to create table.
    */
   public static BeamSqlRecordType toBeamRecordType(RelDataType tableInfo) {
-    BeamSqlRecordType record = new BeamSqlRecordType();
+    List<String> fieldNames = new ArrayList<>();
+    List<Integer> fieldTypes = new ArrayList<>();
     for (RelDataTypeField f : tableInfo.getFieldList()) {
-      record.getFieldsName().add(f.getName());
-      record.getFieldsType().add(toJavaType(f.getType().getSqlTypeName()));
+      fieldNames.add(f.getName());
+      fieldTypes.add(toJavaType(f.getType().getSqlTypeName()));
     }
-    return record;
+    return BeamSqlRecordType.create(fieldNames, fieldTypes);
   }
 
   /**


[2/2] beam git commit: [BEAM-2443] This closes #3377

Posted by ta...@apache.org.
[BEAM-2443] This closes #3377


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/dcd769c8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/dcd769c8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/dcd769c8

Branch: refs/heads/DSL_SQL
Commit: dcd769c8a53daac25b4cb64f7418e29c6687c1df
Parents: abe0f1a 2045373
Author: Tyler Akidau <ta...@apache.org>
Authored: Fri Jun 16 14:20:27 2017 -0700
Committer: Tyler Akidau <ta...@apache.org>
Committed: Fri Jun 16 14:20:27 2017 -0700

----------------------------------------------------------------------
 dsls/sql/pom.xml                                |  5 +++
 .../beam/dsls/sql/example/BeamSqlExample.java   |  9 ++---
 .../beam/dsls/sql/rel/BeamAggregationRel.java   | 20 ++++++-----
 .../beam/dsls/sql/schema/BeamSqlRecordType.java | 38 +++++---------------
 .../transform/BeamAggregationTransforms.java    | 22 +++++++-----
 .../beam/dsls/sql/utils/CalciteUtils.java       | 11 +++---
 6 files changed, 51 insertions(+), 54 deletions(-)
----------------------------------------------------------------------