You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/06/16 21:21:10 UTC
[1/2] beam git commit: [BEAM-2443] apply AutoValue to
BeamSqlRecordType
Repository: beam
Updated Branches:
refs/heads/DSL_SQL abe0f1a0a -> dcd769c8a
[BEAM-2443] apply AutoValue to BeamSqlRecordType
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/20453733
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/20453733
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/20453733
Branch: refs/heads/DSL_SQL
Commit: 20453733ea8679e9fe421950a69921ace80dd381
Parents: abe0f1a
Author: James Xu <xu...@gmail.com>
Authored: Fri Jun 16 14:31:55 2017 +0800
Committer: Tyler Akidau <ta...@apache.org>
Committed: Fri Jun 16 14:19:02 2017 -0700
----------------------------------------------------------------------
dsls/sql/pom.xml | 5 +++
.../beam/dsls/sql/example/BeamSqlExample.java | 9 ++---
.../beam/dsls/sql/rel/BeamAggregationRel.java | 20 ++++++-----
.../beam/dsls/sql/schema/BeamSqlRecordType.java | 38 +++++---------------
.../transform/BeamAggregationTransforms.java | 22 +++++++-----
.../beam/dsls/sql/utils/CalciteUtils.java | 11 +++---
6 files changed, 51 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/20453733/dsls/sql/pom.xml
----------------------------------------------------------------------
diff --git a/dsls/sql/pom.xml b/dsls/sql/pom.xml
index e70c88c..d866313 100644
--- a/dsls/sql/pom.xml
+++ b/dsls/sql/pom.xml
@@ -190,5 +190,10 @@
<version>0.10.1.0</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>com.google.auto.value</groupId>
+ <artifactId>auto-value</artifactId>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/beam/blob/20453733/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
index 6bb1617..31f8302 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
@@ -18,6 +18,8 @@
package org.apache.beam.dsls.sql.example;
import java.sql.Types;
+import java.util.Arrays;
+import java.util.List;
import org.apache.beam.dsls.sql.BeamSql;
import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
import org.apache.beam.dsls.sql.schema.BeamSqlRow;
@@ -47,10 +49,9 @@ class BeamSqlExample {
Pipeline p = Pipeline.create(options);
//define the input row format
- BeamSqlRecordType type = new BeamSqlRecordType();
- type.addField("c1", Types.INTEGER);
- type.addField("c2", Types.VARCHAR);
- type.addField("c3", Types.DOUBLE);
+ List<String> fieldNames = Arrays.asList("c1", "c2", "c3");
+ List<Integer> fieldTypes = Arrays.asList(Types.INTEGER, Types.VARCHAR, Types.DOUBLE);
+ BeamSqlRecordType type = BeamSqlRecordType.create(fieldNames, fieldTypes);
BeamSqlRow row = new BeamSqlRow(type);
row.addField(0, 1);
row.addField(1, "row");
http://git-wip-us.apache.org/repos/asf/beam/blob/20453733/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
index 595563d..bcdc44f 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
@@ -17,8 +17,8 @@
*/
package org.apache.beam.dsls.sql.rel;
+import java.util.ArrayList;
import java.util.List;
-
import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
import org.apache.beam.dsls.sql.schema.BeamSqlRow;
import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
@@ -125,25 +125,29 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode {
*/
private BeamSqlRecordType exKeyFieldsSchema(RelDataType relDataType) {
BeamSqlRecordType inputRecordType = CalciteUtils.toBeamRecordType(relDataType);
- BeamSqlRecordType typeOfKey = new BeamSqlRecordType();
+ List<String> fieldNames = new ArrayList<>();
+ List<Integer> fieldTypes = new ArrayList<>();
for (int i : groupSet.asList()) {
if (i != windowFieldIdx) {
- typeOfKey.addField(inputRecordType.getFieldsName().get(i),
- inputRecordType.getFieldsType().get(i));
+ fieldNames.add(inputRecordType.getFieldsName().get(i));
+ fieldTypes.add(inputRecordType.getFieldsType().get(i));
}
}
- return typeOfKey;
+ return BeamSqlRecordType.create(fieldNames, fieldTypes);
}
/**
* Type of sub-rowrecord, that represents the list of aggregation fields.
*/
private BeamSqlRecordType exAggFieldsSchema() {
- BeamSqlRecordType typeOfAggFields = new BeamSqlRecordType();
+ List<String> fieldNames = new ArrayList<>();
+ List<Integer> fieldTypes = new ArrayList<>();
for (AggregateCall ac : getAggCallList()) {
- typeOfAggFields.addField(ac.name, CalciteUtils.toJavaType(ac.type.getSqlTypeName()));
+ fieldNames.add(ac.name);
+ fieldTypes.add(CalciteUtils.toJavaType(ac.type.getSqlTypeName()));
}
- return typeOfAggFields;
+
+ return BeamSqlRecordType.create(fieldNames, fieldTypes);
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/20453733/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java
index 08ba39f..9fc3945 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java
@@ -17,46 +17,24 @@
*/
package org.apache.beam.dsls.sql.schema;
+import com.google.auto.value.AutoValue;
import java.io.Serializable;
-import java.util.ArrayList;
import java.util.List;
/**
* Field type information in {@link BeamSqlRow}.
*
*/
-public class BeamSqlRecordType implements Serializable {
- private List<String> fieldsName = new ArrayList<>();
- private List<Integer> fieldsType = new ArrayList<>();
+@AutoValue
+public abstract class BeamSqlRecordType implements Serializable {
+ public abstract List<String> getFieldsName();
+ public abstract List<Integer> getFieldsType();
- public void addField(String fieldName, Integer fieldType) {
- fieldsName.add(fieldName);
- fieldsType.add(fieldType);
+ public static BeamSqlRecordType create(List<String> fieldNames, List<Integer> fieldTypes) {
+ return new AutoValue_BeamSqlRecordType(fieldNames, fieldTypes);
}
public int size() {
- return fieldsName.size();
+ return getFieldsName().size();
}
-
- public List<String> getFieldsName() {
- return fieldsName;
- }
-
- public void setFieldsName(List<String> fieldsName) {
- this.fieldsName = fieldsName;
- }
-
- public List<Integer> getFieldsType() {
- return fieldsType;
- }
-
- public void setFieldsType(List<Integer> fieldsType) {
- this.fieldsType = fieldsType;
- }
-
- @Override
- public String toString() {
- return "RecordType [fieldsName=" + fieldsName + ", fieldsType=" + fieldsType + "]";
- }
-
}
http://git-wip-us.apache.org/repos/asf/beam/blob/20453733/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java
index e804b94..83d473a 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java
@@ -106,13 +106,14 @@ public class BeamAggregationTransforms implements Serializable{
}
private BeamSqlRecordType exTypeOfKeyRecord(BeamSqlRecordType dataType) {
- BeamSqlRecordType typeOfKey = new BeamSqlRecordType();
+ List<String> fieldNames = new ArrayList<>();
+ List<Integer> fieldTypes = new ArrayList<>();
for (int idx : groupByKeys) {
- typeOfKey.addField(dataType.getFieldsName().get(idx), dataType.getFieldsType().get(idx));
+ fieldNames.add(dataType.getFieldsName().get(idx));
+ fieldTypes.add(dataType.getFieldsType().get(idx));
}
- return typeOfKey;
+ return BeamSqlRecordType.create(fieldNames, fieldTypes);
}
-
}
/**
@@ -152,19 +153,21 @@ public class BeamAggregationTransforms implements Serializable{
public AggregationCombineFn(List<AggregateCall> aggregationCalls,
BeamSqlRecordType sourceRowRecordType) {
- this.aggDataType = new BeamSqlRecordType();
this.aggFunctions = new ArrayList<>();
this.aggElementExpressions = new ArrayList<>();
boolean hasAvg = false;
boolean hasCount = false;
int countIndex = -1;
+ List<String> fieldNames = new ArrayList<>();
+ List<Integer> fieldTypes = new ArrayList<>();
for (int idx = 0; idx < aggregationCalls.size(); ++idx) {
AggregateCall ac = aggregationCalls.get(idx);
//verify it's supported.
verifySupportedAggregation(ac);
- aggDataType.addField(ac.name, CalciteUtils.toJavaType(ac.type.getSqlTypeName()));
+ fieldNames.add(ac.name);
+ fieldTypes.add(CalciteUtils.toJavaType(ac.type.getSqlTypeName()));
SqlAggFunction aggFn = ac.getAggregation();
switch (aggFn.getName()) {
@@ -190,10 +193,12 @@ public class BeamAggregationTransforms implements Serializable{
}
aggFunctions.add(aggFn.getName());
}
+
+
// add a COUNT holder if only have AVG
if (hasAvg && !hasCount) {
- aggDataType.addField("__COUNT",
- CalciteUtils.toJavaType(SqlTypeName.BIGINT));
+ fieldNames.add("__COUNT");
+ fieldTypes.add(CalciteUtils.toJavaType(SqlTypeName.BIGINT));
aggFunctions.add("COUNT");
aggElementExpressions.add(BeamSqlPrimitive.<Long>of(SqlTypeName.BIGINT, 1L));
@@ -202,6 +207,7 @@ public class BeamAggregationTransforms implements Serializable{
countIndex = aggDataType.size() - 1;
}
+ this.aggDataType = BeamSqlRecordType.create(fieldNames, fieldTypes);
this.countIndex = countIndex;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/20453733/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java
index 46b4911..69ca44b 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java
@@ -19,7 +19,9 @@
package org.apache.beam.dsls.sql.utils;
import java.sql.Types;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
import org.apache.calcite.rel.type.RelDataType;
@@ -82,12 +84,13 @@ public class CalciteUtils {
* Generate {@code BeamSqlRecordType} from {@code RelDataType} which is used to create table.
*/
public static BeamSqlRecordType toBeamRecordType(RelDataType tableInfo) {
- BeamSqlRecordType record = new BeamSqlRecordType();
+ List<String> fieldNames = new ArrayList<>();
+ List<Integer> fieldTypes = new ArrayList<>();
for (RelDataTypeField f : tableInfo.getFieldList()) {
- record.getFieldsName().add(f.getName());
- record.getFieldsType().add(toJavaType(f.getType().getSqlTypeName()));
+ fieldNames.add(f.getName());
+ fieldTypes.add(toJavaType(f.getType().getSqlTypeName()));
}
- return record;
+ return BeamSqlRecordType.create(fieldNames, fieldTypes);
}
/**
[2/2] beam git commit: [BEAM-2443] This closes #3377
Posted by ta...@apache.org.
[BEAM-2443] This closes #3377
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/dcd769c8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/dcd769c8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/dcd769c8
Branch: refs/heads/DSL_SQL
Commit: dcd769c8a53daac25b4cb64f7418e29c6687c1df
Parents: abe0f1a 2045373
Author: Tyler Akidau <ta...@apache.org>
Authored: Fri Jun 16 14:20:27 2017 -0700
Committer: Tyler Akidau <ta...@apache.org>
Committed: Fri Jun 16 14:20:27 2017 -0700
----------------------------------------------------------------------
dsls/sql/pom.xml | 5 +++
.../beam/dsls/sql/example/BeamSqlExample.java | 9 ++---
.../beam/dsls/sql/rel/BeamAggregationRel.java | 20 ++++++-----
.../beam/dsls/sql/schema/BeamSqlRecordType.java | 38 +++++---------------
.../transform/BeamAggregationTransforms.java | 22 +++++++-----
.../beam/dsls/sql/utils/CalciteUtils.java | 11 +++---
6 files changed, 51 insertions(+), 54 deletions(-)
----------------------------------------------------------------------