You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/06/28 01:26:47 UTC
[1/2] beam git commit: Small fixes to make the example run in a
runner agnostic way: - Add direct runner default profile - Add findbugs
validation and fix existing findbugs issues - Validate division by zero on
arithmetic expression + other minor fixes -
Repository: beam
Updated Branches:
refs/heads/DSL_SQL bd99528af -> ab4b11886
Small fixes to make the example run in a runner agnostic way:
- Add direct runner default profile
- Add findbugs validation and fix existing findbugs issues
- Validate division by zero on arithmetic expression + other minor fixes
- Update Calcite version to 1.13
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/32fbc9ce
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/32fbc9ce
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/32fbc9ce
Branch: refs/heads/DSL_SQL
Commit: 32fbc9cec1d0d86e04e3f453b0d75f2ff0e61b56
Parents: bd99528
Author: Ismaël Mejía <ie...@gmail.com>
Authored: Mon Jun 26 16:37:51 2017 +0200
Committer: Tyler Akidau <ta...@apache.org>
Committed: Tue Jun 27 18:24:41 2017 -0700
----------------------------------------------------------------------
dsls/pom.xml | 14 ++++++
dsls/sql/pom.xml | 48 +++++++++++++-------
.../java/org/apache/beam/dsls/sql/BeamSql.java | 4 +-
.../beam/dsls/sql/example/BeamSqlExample.java | 2 +-
.../interpreter/operator/BeamSqlExpression.java | 2 +-
.../interpreter/operator/BeamSqlPrimitive.java | 4 +-
.../arithmetic/BeamSqlArithmeticExpression.java | 7 ++-
.../arithmetic/BeamSqlDivideExpression.java | 3 ++
.../operator/logical/BeamSqlNotExpression.java | 1 -
.../operator/math/BeamSqlAbsExpression.java | 2 +
.../operator/math/BeamSqlRoundExpression.java | 3 +-
.../operator/math/BeamSqlSignExpression.java | 2 +
.../beam/dsls/sql/planner/BeamQueryPlanner.java | 2 +-
.../apache/beam/dsls/sql/rel/BeamIOSinkRel.java | 3 +-
.../beam/dsls/sql/rel/BeamIOSourceRel.java | 4 +-
.../dsls/sql/schema/BeamPCollectionTable.java | 2 +-
.../apache/beam/dsls/sql/schema/BeamSqlRow.java | 5 +-
.../beam/dsls/sql/schema/BeamTableUtils.java | 4 +-
.../transform/BeamSetOperatorsTransforms.java | 5 +-
.../interpreter/BeamSqlFnExecutorTestBase.java | 2 +-
20 files changed, 76 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/32fbc9ce/dsls/pom.xml
----------------------------------------------------------------------
diff --git a/dsls/pom.xml b/dsls/pom.xml
index a741563..d932698 100644
--- a/dsls/pom.xml
+++ b/dsls/pom.xml
@@ -34,6 +34,20 @@
<module>sql</module>
</modules>
+ <profiles>
+ <profile>
+ <id>release</id>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>findbugs-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+
<build>
<pluginManagement>
<plugins>
http://git-wip-us.apache.org/repos/asf/beam/blob/32fbc9ce/dsls/sql/pom.xml
----------------------------------------------------------------------
diff --git a/dsls/sql/pom.xml b/dsls/sql/pom.xml
index d866313..a2279d5 100644
--- a/dsls/sql/pom.xml
+++ b/dsls/sql/pom.xml
@@ -18,11 +18,14 @@
<project
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+
<modelVersion>4.0.0</modelVersion>
+
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-dsls-parent</artifactId>
<version>2.1.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
</parent>
<artifactId>beam-dsls-sql</artifactId>
@@ -34,10 +37,30 @@
<properties>
<timestamp>${maven.build.timestamp}</timestamp>
<maven.build.timestamp.format>yyyy-MM-dd HH:mm</maven.build.timestamp.format>
- <calcite-version>1.12.0</calcite-version>
- <avatica-version>1.9.0</avatica-version>
+ <calcite-version>1.13.0</calcite-version>
+ <avatica-version>1.10.0</avatica-version>
</properties>
+ <profiles>
+ <!--
+ The direct runner is available by default.
+ You can also include it on the classpath explicitly with -P direct-runner
+ -->
+ <profile>
+ <id>direct-runner</id>
+ <activation>
+ <activeByDefault>true</activeByDefault>
+ </activation>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-direct-java</artifactId>
+ <scope>runtime</scope>
+ </dependency>
+ </dependencies>
+ </profile>
+ </profiles>
+
<build>
<resources>
<resource>
@@ -62,11 +85,6 @@
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- </plugin>
-
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<argLine>-da</argLine> <!-- disable assert in Calcite converter validation -->
@@ -75,11 +93,6 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-jar-plugin</artifactId>
- </plugin>
-
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
@@ -140,11 +153,6 @@
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
- <artifactId>beam-runners-direct-java</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-kafka</artifactId>
<scope>provided</scope>
</dependency>
@@ -195,5 +203,11 @@
<artifactId>auto-value</artifactId>
<scope>provided</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-direct-java</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/beam/blob/32fbc9ce/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
index 5f90380..a0e7cbc 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
@@ -103,7 +103,7 @@ public class BeamSql {
*/
private static class QueryTransform extends
PTransform<PCollectionTuple, PCollection<BeamSqlRow>> {
- private BeamSqlEnv sqlEnv;
+ private transient BeamSqlEnv sqlEnv;
private String sqlQuery;
public QueryTransform(String sqlQuery) {
@@ -153,7 +153,7 @@ public class BeamSql {
private static class SimpleQueryTransform
extends PTransform<PCollection<BeamSqlRow>, PCollection<BeamSqlRow>> {
private static final String PCOLLECTION_TABLE_NAME = "PCOLLECTION";
- BeamSqlEnv sqlEnv = new BeamSqlEnv();
+ private transient BeamSqlEnv sqlEnv = new BeamSqlEnv();
private String sqlQuery;
public SimpleQueryTransform(String sqlQuery) {
http://git-wip-us.apache.org/repos/asf/beam/blob/32fbc9ce/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
index 5f09fdd..04fe451 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
@@ -45,7 +45,7 @@ class BeamSqlExample {
private static final Logger LOG = LoggerFactory.getLogger(BeamSqlExample.class);
public static void main(String[] args) throws Exception {
- PipelineOptions options = PipelineOptionsFactory.create();
+ PipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(PipelineOptions.class);
Pipeline p = Pipeline.create(options);
//define the input row format
http://git-wip-us.apache.org/repos/asf/beam/blob/32fbc9ce/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java
index 41dac76..33feb3e 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java
@@ -30,7 +30,7 @@ import org.apache.calcite.sql.type.SqlTypeName;
* as its operands, and return a value with type {@link SqlTypeName}.
*
*/
-public abstract class BeamSqlExpression implements Serializable{
+public abstract class BeamSqlExpression implements Serializable {
protected List<BeamSqlExpression> operands;
protected SqlTypeName outputType;
http://git-wip-us.apache.org/repos/asf/beam/blob/32fbc9ce/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
index b9d1559..92d1263 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
@@ -30,7 +30,7 @@ import org.apache.calcite.util.NlsString;
* It holds the value, and return it directly during {@link #evaluate(BeamSqlRow)}.
*
*/
-public class BeamSqlPrimitive<T> extends BeamSqlExpression{
+public class BeamSqlPrimitive<T> extends BeamSqlExpression {
private T value;
private BeamSqlPrimitive() {
@@ -44,7 +44,7 @@ public class BeamSqlPrimitive<T> extends BeamSqlExpression{
* A builder function to create from Type and value directly.
*/
public static <T> BeamSqlPrimitive<T> of(SqlTypeName outputType, T value){
- BeamSqlPrimitive<T> exp = new BeamSqlPrimitive<T>();
+ BeamSqlPrimitive<T> exp = new BeamSqlPrimitive<>();
exp.outputType = outputType;
exp.value = value;
if (!exp.accept()) {
http://git-wip-us.apache.org/repos/asf/beam/blob/32fbc9ce/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
index 69f6f10..f3fd68f 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
@@ -80,12 +80,11 @@ public abstract class BeamSqlArithmeticExpression extends BeamSqlExpression {
private double getDouble(BeamSqlRow inputRecord, BeamSqlExpression op) {
Object raw = op.evaluate(inputRecord).getValue();
- Double ret = null;
if (SqlTypeName.NUMERIC_TYPES.contains(op.getOutputType())) {
- ret = ((Number) raw).doubleValue();
+ return ((Number) raw).doubleValue();
}
-
- return ret;
+ throw new IllegalStateException(
+ String.format("Can't build a valid arithmetic expression with argument %s", raw));
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/32fbc9ce/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlDivideExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlDivideExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlDivideExpression.java
index c23f54c..907b1fc 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlDivideExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlDivideExpression.java
@@ -35,6 +35,9 @@ public class BeamSqlDivideExpression extends BeamSqlArithmeticExpression {
}
@Override public Double calc(Number left, Number right) {
+ if (right.doubleValue() == 0) {
+ throw new IllegalArgumentException("divisor cannot be 0");
+ }
return left.doubleValue() / right.doubleValue();
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/32fbc9ce/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlNotExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlNotExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlNotExpression.java
index 21b1111..ffa0184 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlNotExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlNotExpression.java
@@ -40,7 +40,6 @@ public class BeamSqlNotExpression extends BeamSqlLogicalExpression {
if (numberOfOperands() != 1) {
return false;
}
-
return super.accept();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/32fbc9ce/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlAbsExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlAbsExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlAbsExpression.java
index 2c6e6b4..5677fc3 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlAbsExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlAbsExpression.java
@@ -66,6 +66,8 @@ public class BeamSqlAbsExpression extends BeamSqlMathUnaryExpression {
result = BeamSqlPrimitive
.of(SqlTypeName.DOUBLE, SqlFunctions.abs(op.getDouble()));
break;
+ default:
+ break;
}
return result;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/32fbc9ce/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlRoundExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlRoundExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlRoundExpression.java
index e03b9cb..21dc09e 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlRoundExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlRoundExpression.java
@@ -46,7 +46,6 @@ public class BeamSqlRoundExpression extends BeamSqlMathBinaryExpression {
@Override public BeamSqlPrimitive<? extends Number> calculate(BeamSqlPrimitive leftOp,
BeamSqlPrimitive rightOp) {
BeamSqlPrimitive result = null;
-
switch (leftOp.getOutputType()) {
case SMALLINT:
result = BeamSqlPrimitive.of(SqlTypeName.SMALLINT,
@@ -72,6 +71,8 @@ public class BeamSqlRoundExpression extends BeamSqlMathBinaryExpression {
result = BeamSqlPrimitive.of(SqlTypeName.DECIMAL,
roundBigDecimal(toBigDecimal(leftOp.getValue()), toInt(rightOp.getValue())));
break;
+ default:
+ break;
}
return result;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/32fbc9ce/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlSignExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlSignExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlSignExpression.java
index 3ca42e6..311c9a0 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlSignExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlSignExpression.java
@@ -55,6 +55,8 @@ public class BeamSqlSignExpression extends BeamSqlMathUnaryExpression {
result = BeamSqlPrimitive
.of(SqlTypeName.DECIMAL, SqlFunctions.sign(SqlFunctions.toBigDecimal(op.getValue())));
break;
+ default:
+ break;
}
return result;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/32fbc9ce/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
index 6ae8a1e..93f9a2f 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
@@ -73,7 +73,7 @@ public class BeamQueryPlanner {
RelDataTypeSystem.DEFAULT);
public BeamQueryPlanner(SchemaPlus schema) {
- final List<RelTraitDef> traitDefs = new ArrayList<RelTraitDef>();
+ final List<RelTraitDef> traitDefs = new ArrayList<>();
traitDefs.add(ConventionTraitDef.INSTANCE);
traitDefs.add(RelCollationTraitDef.INSTANCE);
http://git-wip-us.apache.org/repos/asf/beam/blob/32fbc9ce/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java
index 58539f8..d70f94a 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java
@@ -24,7 +24,6 @@ import org.apache.beam.dsls.sql.schema.BaseBeamTable;
import org.apache.beam.dsls.sql.schema.BeamSqlRow;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.PDone;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelTraitSet;
@@ -68,7 +67,7 @@ public class BeamIOSinkRel extends TableModify implements BeamRelNode {
BaseBeamTable targetTable = sqlEnv.findTable(sourceName);
- PDone streamEnd = upstream.apply(stageName, targetTable.buildIOWriter());
+ upstream.apply(stageName, targetTable.buildIOWriter());
return upstream;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/32fbc9ce/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
index a664ce1..d323d82 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
@@ -44,9 +44,7 @@ public class BeamIOSourceRel extends TableScan implements BeamRelNode {
, BeamSqlEnv sqlEnv) throws Exception {
String sourceName = Joiner.on('.').join(getTable().getQualifiedName());
- String stageName = BeamSqlRelUtils.getStageName(this);
-
- TupleTag<BeamSqlRow> sourceTupleTag = new TupleTag<BeamSqlRow>(sourceName);
+ TupleTag<BeamSqlRow> sourceTupleTag = new TupleTag<>(sourceName);
if (inputPCollections.has(sourceTupleTag)) {
//choose PCollection from input PCollectionTuple if exists there.
PCollection<BeamSqlRow> sourceStream = inputPCollections
http://git-wip-us.apache.org/repos/asf/beam/blob/32fbc9ce/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java
index ecd0d67..8309097 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java
@@ -29,7 +29,7 @@ import org.apache.beam.sdk.values.PDone;
*/
public class BeamPCollectionTable extends BaseBeamTable {
private BeamIOType ioType;
- private PCollection<BeamSqlRow> upstream;
+ private transient PCollection<BeamSqlRow> upstream;
protected BeamPCollectionTable(BeamSqlRecordType beamSqlRecordType) {
super(beamSqlRecordType);
http://git-wip-us.apache.org/repos/asf/beam/blob/32fbc9ce/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
index 3a67303..213dcd5 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
@@ -342,7 +342,7 @@ public class BeamSqlRow implements Serializable {
* Return data fields as key=value.
*/
public String valueInString() {
- StringBuffer sb = new StringBuffer();
+ StringBuilder sb = new StringBuilder();
for (int idx = 0; idx < size(); ++idx) {
sb.append(String.format(",%s=%s", dataType.getFieldsName().get(idx), getFieldValue(idx)));
}
@@ -364,4 +364,7 @@ public class BeamSqlRow implements Serializable {
return toString().equals(other.toString());
}
+ @Override public int hashCode() {
+ return toString().hashCode();
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/32fbc9ce/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java
index 79a9cb2..7157793 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java
@@ -45,7 +45,7 @@ public final class BeamTableUtils {
if (rawRecord.size() != beamSqlRecordType.size()) {
throw new IllegalArgumentException(String.format(
- "Expect %d fields, but actually %d", line,
+ "Expect %d fields, but actually %d",
beamSqlRecordType.size(), rawRecord.size()
));
} else {
@@ -75,7 +75,7 @@ public final class BeamTableUtils {
public static void addFieldWithAutoTypeCasting(BeamSqlRow row, int idx, Object rawObj) {
if (rawObj == null) {
- row.addField(idx, rawObj);
+ row.addField(idx, null);
return;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/32fbc9ce/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSetOperatorsTransforms.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSetOperatorsTransforms.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSetOperatorsTransforms.java
index 56b3e14..a983cf5 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSetOperatorsTransforms.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSetOperatorsTransforms.java
@@ -85,9 +85,8 @@ public abstract class BeamSetOperatorsTransforms {
case INTERSECT:
if (leftRows.iterator().hasNext() && rightRows.iterator().hasNext()) {
if (all) {
- Iterator<BeamSqlRow> iter = leftRows.iterator();
- while (iter.hasNext()) {
- ctx.output(iter.next());
+ for (BeamSqlRow leftRow : leftRows) {
+ ctx.output(leftRow);
}
} else {
ctx.output(ctx.element().getKey());
http://git-wip-us.apache.org/repos/asf/beam/blob/32fbc9ce/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTestBase.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTestBase.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTestBase.java
index 739d548..5afd273 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTestBase.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTestBase.java
@@ -79,7 +79,7 @@ public class BeamSqlFnExecutorTestBase {
record.addField(3, 1234567L);
SchemaPlus schema = Frameworks.createRootSchema(true);
- final List<RelTraitDef> traitDefs = new ArrayList<RelTraitDef>();
+ final List<RelTraitDef> traitDefs = new ArrayList<>();
traitDefs.add(ConventionTraitDef.INSTANCE);
traitDefs.add(RelCollationTraitDef.INSTANCE);
FrameworkConfig config = Frameworks.newConfigBuilder()
[2/2] beam git commit: This closes #3439
Posted by ta...@apache.org.
This closes #3439
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ab4b1188
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ab4b1188
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ab4b1188
Branch: refs/heads/DSL_SQL
Commit: ab4b118869070e94a4205744d6d60525d3fa2882
Parents: bd99528 32fbc9c
Author: Tyler Akidau <ta...@apache.org>
Authored: Tue Jun 27 18:25:56 2017 -0700
Committer: Tyler Akidau <ta...@apache.org>
Committed: Tue Jun 27 18:25:56 2017 -0700
----------------------------------------------------------------------
dsls/pom.xml | 14 ++++++
dsls/sql/pom.xml | 48 +++++++++++++-------
.../java/org/apache/beam/dsls/sql/BeamSql.java | 4 +-
.../beam/dsls/sql/example/BeamSqlExample.java | 2 +-
.../interpreter/operator/BeamSqlExpression.java | 2 +-
.../interpreter/operator/BeamSqlPrimitive.java | 4 +-
.../arithmetic/BeamSqlArithmeticExpression.java | 7 ++-
.../arithmetic/BeamSqlDivideExpression.java | 3 ++
.../operator/logical/BeamSqlNotExpression.java | 1 -
.../operator/math/BeamSqlAbsExpression.java | 2 +
.../operator/math/BeamSqlRoundExpression.java | 3 +-
.../operator/math/BeamSqlSignExpression.java | 2 +
.../beam/dsls/sql/planner/BeamQueryPlanner.java | 2 +-
.../apache/beam/dsls/sql/rel/BeamIOSinkRel.java | 3 +-
.../beam/dsls/sql/rel/BeamIOSourceRel.java | 4 +-
.../dsls/sql/schema/BeamPCollectionTable.java | 2 +-
.../apache/beam/dsls/sql/schema/BeamSqlRow.java | 5 +-
.../beam/dsls/sql/schema/BeamTableUtils.java | 4 +-
.../transform/BeamSetOperatorsTransforms.java | 5 +-
.../interpreter/BeamSqlFnExecutorTestBase.java | 2 +-
20 files changed, 76 insertions(+), 43 deletions(-)
----------------------------------------------------------------------